qibft consensus and support for qibftBlock fork block

This commit is contained in:
Jitendra Bhurat 2020-05-28 11:39:30 -04:00
parent 282b74045a
commit 8044d33d40
39 changed files with 5101 additions and 2 deletions

View File

@ -1959,6 +1959,7 @@ func MakeChain(ctx *cli.Context, stack *node.Node, useExist bool) (chain *core.B
}
istanbulConfig.ProposerPolicy = istanbul.ProposerPolicy(config.Istanbul.ProposerPolicy)
istanbulConfig.Ceil2Nby3Block = config.Istanbul.Ceil2Nby3Block
istanbulConfig.QibftBlock = config.Istanbul.QibftBlock
engine = istanbulBackend.New(istanbulConfig, stack.GetNodeKey(), chainDb)
} else if config.IsQuorum {
// for Raft

View File

@ -72,4 +72,10 @@ type Backend interface {
HasBadProposal(hash common.Hash) bool
Close() error
// IsQIBFTConsensus checks qibftBlock fork block and returns if it should be enabled
IsQIBFTConsensus() bool
// StartQBFTConsensus stops existing legacy ibft consensus and starts the new qibft consensus
StartQIBFTConsensus() error
}

View File

@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/istanbul"
istanbulCore "github.com/ethereum/go-ethereum/consensus/istanbul/core"
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
qibftCore "github.com/ethereum/go-ethereum/consensus/qibft/core"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
@ -61,7 +62,11 @@ func New(config *istanbul.Config, privateKey *ecdsa.PrivateKey, db ethdb.Databas
recentMessages: recentMessages,
knownMessages: knownMessages,
}
backend.core = istanbulCore.New(backend, backend.config)
if backend.IsQIBFTConsensus() {
backend.core = qibftCore.New(backend, backend.config)
} else {
backend.core = istanbulCore.New(backend, backend.config)
}
return backend
}
@ -98,6 +103,8 @@ type backend struct {
recentMessages *lru.ARCCache // the cache of peer's messages
knownMessages *lru.ARCCache // the cache of self messages
qibftConsensusEnabled bool // qibft consensus
}
// zekun: HACK
@ -317,3 +324,41 @@ func (sb *backend) HasBadProposal(hash common.Hash) bool {
func (sb *backend) Close() error {
return nil
}
// IsQIBFTConsensus returns whether qbft consensus should be used
func (sb *backend) IsQIBFTConsensus() bool {
if sb.qibftConsensusEnabled {
return true
}
// If it is a new network then by default use qibft consensus
if sb.config.QibftBlock == nil || sb.config.QibftBlock.Uint64() == 0 {
return true
}
if sb.chain != nil && sb.chain.CurrentHeader().Number.Cmp(sb.config.QibftBlock) >= 0 {
return true
}
return false
}
func (sb *backend) StartQIBFTConsensus() error {
sb.logger.Trace("Starting QIBFT Consensus")
if err := sb.Stop(); err != nil {
return err
}
sb.logger.Trace("Stopped legacy IBFT consensus")
sb.coreMu.Lock()
defer sb.coreMu.Unlock()
// Set the core to qibft
sb.core = qibftCore.New(sb, sb.config)
sb.logger.Trace("Starting qibft")
if err := sb.core.Start(); err != nil {
return err
}
sb.logger.Trace("Started qibft consensus")
sb.coreStarted = true
sb.qibftConsensusEnabled = true
return nil
}

View File

@ -535,6 +535,12 @@ func (sb *backend) Start(chain consensus.ChainReader, currentBlock func() *types
}
sb.coreStarted = true
if sb.IsQIBFTConsensus() {
sb.logger.Trace("Started qibft consensus")
sb.qibftConsensusEnabled = true
}
return nil
}
@ -549,6 +555,11 @@ func (sb *backend) Stop() error {
return err
}
sb.coreStarted = false
if sb.IsQIBFTConsensus() {
sb.qibftConsensusEnabled = false
}
return nil
}

View File

@ -46,6 +46,8 @@ func newBlockChain(n int) (*core.BlockChain, *backend) {
config := istanbul.DefaultConfig
// Use the first key as private key
b, _ := New(config, nodeKeys[0], memDB).(*backend)
// set the qibft consensus enabled to true
b.qibftConsensusEnabled = true
genesis.MustCommit(memDB)
blockchain, err := core.NewBlockChain(memDB, nil, genesis.Config, b, vm.Config{}, nil)
if err != nil {

View File

@ -32,6 +32,7 @@ type Config struct {
Epoch uint64 `toml:",omitempty"` // The number of blocks after which to checkpoint and reset the pending votes
Ceil2Nby3Block *big.Int `toml:",omitempty"` // Number of confirmations required to move from one state to next [2F + 1 to Ceil(2N/3)]
AllowedFutureBlockTime uint64 `toml:",omitempty"` // Max time (in seconds) from current time allowed for blocks, before they're considered future blocks
QibftBlock *big.Int `toml:",omitempty"` // Fork block at which block confirmations are done using qibft consensus instead of ibft
}
var DefaultConfig = &Config{
@ -41,4 +42,5 @@ var DefaultConfig = &Config{
Epoch: 30000,
Ceil2Nby3Block: big.NewInt(0),
AllowedFutureBlockTime: 0,
QibftBlock: big.NewInt(0),
}

View File

@ -197,6 +197,17 @@ func (c *core) startNewRound(round *big.Int) {
logger = c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence())
}
logger.Trace("Start new ibft round")
// If new round is 0, then check if qibftConsensus needs to be enabled
if round.Uint64() == 0 && c.backend.IsQIBFTConsensus() {
logger.Trace("Starting qibft consensus as qibftBlock has passed")
if err := c.backend.StartQIBFTConsensus(); err != nil {
// TODO Do we panic or continue and try to start qibft when processing the next sequence
logger.Error("Unable to start QIBFT Consensus, retrying for the next block", "error", err)
}
}
roundChange := false
// Try to get last proposal
lastProposal, lastProposer := c.backend.LastProposal()

View File

@ -161,6 +161,14 @@ func (sb *testSystemBackend) Close() error {
return nil
}
func (sb *testSystemBackend) IsQIBFTConsensus() bool {
return true
}
func (sb *testSystemBackend) StartQIBFTConsensus() error {
return nil
}
// ==============================================
//
// define the struct that need to be provided for integration tests.

View File

@ -0,0 +1,196 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"github.com/ethereum/go-ethereum/consensus/istanbul"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
var (
// msgPriority is defined for calculating processing priority to speedup consensus
// msgPreprepare > msgCommit > msgPrepare
msgPriority = map[uint64]int{
msgPreprepare: 1,
msgCommit: 2,
msgPrepare: 3,
}
)
// checkMessage checks the message state
// return errInvalidMessage if the message is invalid
// return errFutureMessage if the message view is larger than current view
// return errOldMessage if the message view is smaller than current view
func (c *core) checkMessage(msgCode uint64, view *View) error {
if view == nil || view.Sequence == nil || view.Round == nil {
return errInvalidMessage
}
if msgCode == msgRoundChange {
if view.Sequence.Cmp(c.currentView().Sequence) > 0 {
return errFutureMessage
} else if view.Cmp(c.currentView()) < 0 {
return errOldMessage
}
return nil
}
if view.Cmp(c.currentView()) > 0 {
return errFutureMessage
}
if view.Cmp(c.currentView()) < 0 {
return errOldMessage
}
// StateAcceptRequest only accepts msgPreprepare
// other messages are future messages
if c.state == StateAcceptRequest {
if msgCode > msgPreprepare {
return errFutureMessage
}
return nil
}
// For states(StatePreprepared, StatePrepared, StateCommitted),
// can accept all message types if processing with same view
return nil
}
func (c *core) storeBacklog(msg *message, src istanbul.Validator) {
logger := c.logger.New("from", src, "state", c.state)
if src.Address() == c.Address() {
logger.Warn("Backlog from self")
return
}
logger.Trace("Store future message")
c.backlogsMu.Lock()
defer c.backlogsMu.Unlock()
logger.Debug("Retrieving backlog queue", "for", src.Address(), "backlogs_size", len(c.backlogs))
backlog := c.backlogs[src.Address()]
if backlog == nil {
backlog = prque.New()
}
switch msg.Code {
case msgPreprepare:
var p *Preprepare
err := msg.Decode(&p)
if err == nil {
backlog.Push(msg, toPriority(msg.Code, p.View))
}
case msgRoundChange:
var p *RoundChangeMessage
err := msg.Decode(&p)
if err == nil {
backlog.Push(msg, toPriority(msg.Code, p.View))
}
// for msgPrepare and msgCommit cases
default:
var p *Subject
err := msg.Decode(&p)
if err == nil {
backlog.Push(msg, toPriority(msg.Code, p.View))
}
}
c.backlogs[src.Address()] = backlog
}
func (c *core) processBacklog() {
c.backlogsMu.Lock()
defer c.backlogsMu.Unlock()
for srcAddress, backlog := range c.backlogs {
if backlog == nil {
continue
}
_, src := c.valSet.GetByAddress(srcAddress)
if src == nil {
// validator is not available
delete(c.backlogs, srcAddress)
continue
}
logger := c.logger.New("from", src, "state", c.state)
isFuture := false
// We stop processing if
// 1. backlog is empty
// 2. The first message in queue is a future message
for !(backlog.Empty() || isFuture) {
m, prio := backlog.Pop()
msg := m.(*message)
var view *View
switch msg.Code {
case msgPreprepare:
var m *Preprepare
err := msg.Decode(&m)
if err == nil {
view = m.View
}
case msgRoundChange:
var rc *RoundChangeMessage
err := msg.Decode(&rc)
if err == nil {
view = rc.View
}
// for msgPrepare and msgCommit cases
default:
var sub *Subject
err := msg.Decode(&sub)
if err == nil {
view = sub.View
}
}
if view == nil {
logger.Debug("Nil view", "msg", msg)
continue
}
// Push back if it's a future message
err := c.checkMessage(msg.Code, view)
if err != nil {
if err == errFutureMessage {
logger.Trace("Stop processing backlog", "msg", msg)
backlog.Push(msg, prio)
isFuture = true
break
}
logger.Trace("Skip the backlog event", "msg", msg, "err", err)
continue
}
logger.Trace("Post backlog event", "msg", msg)
go c.sendEvent(backlogEvent{
src: src,
msg: msg,
})
}
}
}
func toPriority(msgCode uint64, view *View) float32 {
if msgCode == msgRoundChange {
// For msgRoundChange, set the message priority based on its sequence
return -float32(view.Sequence.Uint64() * 1000)
}
// FIXME: round will be reset as 0 while new sequence
// 10 * Round limits the range of message code is from 0 to 9
// 1000 * Sequence limits the range of round is from 0 to 99
return -float32(view.Sequence.Uint64()*1000 + view.Round.Uint64()*10 + uint64(msgPriority[msgCode]))
}

View File

@ -0,0 +1,390 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"github.com/ethereum/go-ethereum/consensus/istanbul"
"math/big"
"reflect"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
func TestCheckMessage(t *testing.T) {
c := &core{
state: StateAcceptRequest,
current: newRoundState(&View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4), nil, nil, nil),
}
// invalid view format
err := c.checkMessage(msgPreprepare, nil)
if err != errInvalidMessage {
t.Errorf("error mismatch: have %v, want %v", err, errInvalidMessage)
}
testStates := []State{StateAcceptRequest, StatePreprepared, StatePrepared, StateCommitted}
testCode := []uint64{msgPreprepare, msgPrepare, msgCommit, msgRoundChange}
// future sequence
v := &View{
Sequence: big.NewInt(2),
Round: big.NewInt(0),
}
for i := 0; i < len(testStates); i++ {
c.state = testStates[i]
for j := 0; j < len(testCode); j++ {
err := c.checkMessage(testCode[j], v)
if err != errFutureMessage {
t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage)
}
}
}
// future round
v = &View{
Sequence: big.NewInt(1),
Round: big.NewInt(1),
}
for i := 0; i < len(testStates); i++ {
c.state = testStates[i]
for j := 0; j < len(testCode); j++ {
err := c.checkMessage(testCode[j], v)
if testCode[j] == msgRoundChange {
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
} else if err != errFutureMessage {
t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage)
}
}
}
// current view but waiting for round change
v = &View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}
for i := 0; i < len(testStates); i++ {
c.state = testStates[i]
for j := 0; j < len(testCode); j++ {
err := c.checkMessage(testCode[j], v)
if testCode[j] == msgRoundChange {
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
} else if testStates[i] == StateAcceptRequest && testCode[j] > msgPreprepare {
if err != errFutureMessage {
t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage)
}
} else if err != nil {
t.Errorf("error mismatch: have %v, want %v", err, nil)
}
}
}
v = c.currentView()
// current view, state = StateAcceptRequest
c.state = StateAcceptRequest
for i := 0; i < len(testCode); i++ {
err = c.checkMessage(testCode[i], v)
if testCode[i] == msgRoundChange {
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
} else if testCode[i] == msgPreprepare {
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
} else {
if err != errFutureMessage {
t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage)
}
}
}
// current view, state = StatePreprepared
c.state = StatePreprepared
for i := 0; i < len(testCode); i++ {
err = c.checkMessage(testCode[i], v)
if testCode[i] == msgRoundChange {
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
} else if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
}
// current view, state = StatePrepared
c.state = StatePrepared
for i := 0; i < len(testCode); i++ {
err = c.checkMessage(testCode[i], v)
if testCode[i] == msgRoundChange {
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
} else if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
}
// current view, state = StateCommitted
c.state = StateCommitted
for i := 0; i < len(testCode); i++ {
err = c.checkMessage(testCode[i], v)
if testCode[i] == msgRoundChange {
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
} else if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
}
}
func TestStoreBacklog(t *testing.T) {
c := &core{
logger: log.New("backend", "test", "id", 0),
valSet: newTestValidatorSet(1),
backlogs: make(map[common.Address]*prque.Prque),
backlogsMu: new(sync.Mutex),
}
v := &View{
Round: big.NewInt(10),
Sequence: big.NewInt(10),
}
p := c.valSet.GetByIndex(0)
// push preprepare msg
proposal := makeBlock(1)
preprepare := &Preprepare{
View: v,
Proposal: proposal,
PreparedMessages: newMessageSet(c.valSet),
RCMessages: newMessageSet(c.valSet),
}
prepreparePayload, _ := Encode(preprepare)
m := &message{
Code: msgPreprepare,
Msg: prepreparePayload,
}
c.storeBacklog(m, p)
msg := c.backlogs[p.Address()].PopItem()
if !reflect.DeepEqual(msg, m) {
t.Errorf("message mismatch: have %v, want %v", msg, m)
}
preparedBlock := newTestProposal()
// push prepare msg
subject := &Subject{
View: v,
Digest: preparedBlock,
}
subjectPayload, _ := Encode(subject)
// round change message
rcMessage := &RoundChangeMessage{
View: v,
PreparedRound: v.Round,
PreparedBlock: preparedBlock,
}
rcPayload, _ := Encode(rcMessage)
m = &message{
Code: msgPrepare,
Msg: subjectPayload,
}
c.storeBacklog(m, p)
msg = c.backlogs[p.Address()].PopItem()
if !reflect.DeepEqual(msg, m) {
t.Errorf("message mismatch: have %v, want %v", msg, m)
}
// push commit msg
m = &message{
Code: msgCommit,
Msg: subjectPayload,
}
c.storeBacklog(m, p)
msg = c.backlogs[p.Address()].PopItem()
if !reflect.DeepEqual(msg, m) {
t.Errorf("message mismatch: have %v, want %v", msg, m)
}
// push roundChange msg
m = &message{
Code: msgRoundChange,
Msg: rcPayload,
}
c.storeBacklog(m, p)
msg = c.backlogs[p.Address()].PopItem()
if !reflect.DeepEqual(msg, m) {
t.Errorf("message mismatch: have %v, want %v", msg, m)
}
}
func TestProcessFutureBacklog(t *testing.T) {
backend := &testSystemBackend{
events: new(event.TypeMux),
}
c := &core{
logger: log.New("backend", "test", "id", 0),
valSet: newTestValidatorSet(1),
backlogs: make(map[common.Address]*prque.Prque),
backlogsMu: new(sync.Mutex),
backend: backend,
current: newRoundState(&View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4), nil, nil, nil),
state: StateAcceptRequest,
}
c.subscribeEvents()
defer c.unsubscribeEvents()
v := &View{
Round: big.NewInt(10),
Sequence: big.NewInt(10),
}
p := c.valSet.GetByIndex(0)
// push a future msg
subject := &Subject{
View: v,
Digest: makeBlock(5),
}
subjectPayload, _ := Encode(subject)
m := &message{
Code: msgCommit,
Msg: subjectPayload,
}
c.storeBacklog(m, p)
c.processBacklog()
const timeoutDura = 2 * time.Second
timeout := time.NewTimer(timeoutDura)
select {
case e, ok := <-c.events.Chan():
if !ok {
return
}
t.Errorf("unexpected events comes: %v", e)
case <-timeout.C:
// success
}
}
func TestProcessBacklog(t *testing.T) {
vset := newTestValidatorSet(1)
v := &View{
Round: big.NewInt(0),
Sequence: big.NewInt(1),
}
proposal := makeBlock(1)
preprepare := &Preprepare{
View: v,
Proposal: proposal,
PreparedMessages: newMessageSet(vset),
RCMessages: newMessageSet(vset),
}
prepreparePayload, _ := Encode(preprepare)
subject := &Subject{
View: v,
Digest: makeBlock(5),
}
subjectPayload, _ := Encode(subject)
roundChange := &RoundChangeMessage{
View: v,
PreparedRound: v.Round,
PreparedBlock: makeBlock(5),
}
roundChangePayload, _ := Encode(roundChange)
msgs := []*message{
&message{
Code: msgPreprepare,
Msg: prepreparePayload,
},
&message{
Code: msgPrepare,
Msg: subjectPayload,
},
&message{
Code: msgCommit,
Msg: subjectPayload,
},
&message{
Code: msgRoundChange,
Msg: roundChangePayload,
},
}
for i := 0; i < len(msgs); i++ {
testProcessBacklog(t, msgs[i], vset)
}
}
func testProcessBacklog(t *testing.T, msg *message, vset istanbul.ValidatorSet) {
backend := &testSystemBackend{
events: new(event.TypeMux),
peers: vset,
}
c := &core{
logger: log.New("backend", "test", "id", 0),
backlogs: make(map[common.Address]*prque.Prque),
backlogsMu: new(sync.Mutex),
valSet: vset,
backend: backend,
state: State(msg.Code),
current: newRoundState(&View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4), nil, nil, nil),
}
c.subscribeEvents()
defer c.unsubscribeEvents()
c.storeBacklog(msg, vset.GetByIndex(0))
c.processBacklog()
const timeoutDura = 2 * time.Second
timeout := time.NewTimer(timeoutDura)
select {
case ev := <-c.events.Chan():
e, ok := ev.Data.(backlogEvent)
if !ok {
t.Errorf("unexpected event comes: %v", reflect.TypeOf(ev.Data))
}
if e.msg.Code != msg.Code {
t.Errorf("message code mismatch: have %v, want %v", e.msg.Code, msg.Code)
}
// success
case <-timeout.C:
t.Errorf("unexpected timeout occurs for msg: %v", msg.Code)
}
}

View File

@ -0,0 +1,104 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"reflect"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)
func (c *core) sendCommit() {
sub := c.current.Subject()
c.broadcastCommit(sub)
}
func (c *core) sendCommitForOldBlock(view *View, digest istanbul.Proposal) {
sub := &Subject{
View: view,
Digest: digest,
}
c.broadcastCommit(sub)
}
func (c *core) broadcastCommit(sub *Subject) {
logger := c.logger.New("state", c.state)
encodedSubject, err := Encode(sub)
if err != nil {
logger.Error("Failed to encode", "subject", sub)
return
}
c.broadcast(&message{
Code: msgCommit,
Msg: encodedSubject,
})
}
func (c *core) handleCommit(msg *message, src istanbul.Validator) error {
// Decode COMMIT message
var commit *Subject
err := msg.Decode(&commit)
if err != nil {
return errFailedDecodeCommit
}
if err := c.checkMessage(msgCommit, commit.View); err != nil {
return err
}
if err := c.verifyCommit(commit, src); err != nil {
return err
}
c.acceptCommit(msg, src)
// Commit the proposal once we have enough COMMIT messages and we are not in the Committed state.
//
// If we already have a proposal, we may have chance to speed up the consensus process
// by committing the proposal without PREPARE messages.
if c.current.Commits.Size() >= c.QuorumSize() && c.state.Cmp(StateCommitted) < 0 {
c.commit()
}
return nil
}
// verifyCommit verifies if the received COMMIT message is equivalent to our subject
func (c *core) verifyCommit(commit *Subject, src istanbul.Validator) error {
logger := c.logger.New("from", src, "state", c.state)
sub := c.current.Subject()
if !reflect.DeepEqual(commit.View, sub.View) || commit.Digest.Hash().Hex() != sub.Digest.Hash().Hex() {
logger.Warn("Inconsistent subjects between commit and proposal", "expected", sub, "got", commit)
return errInconsistentSubject
}
return nil
}
func (c *core) acceptCommit(msg *message, src istanbul.Validator) error {
logger := c.logger.New("from", src, "state", c.state)
// Add the COMMIT message to current round state
if err := c.current.Commits.Add(msg); err != nil {
logger.Error("Failed to record commit message", "msg", msg, "err", err)
return err
}
return nil
}

View File

@ -0,0 +1,316 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"bytes"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
"github.com/ethereum/go-ethereum/crypto"
)
func TestHandleCommit(t *testing.T) {
N := uint64(4)
F := uint64(1)
proposal := newTestProposal()
expectedSubject := &Subject{
View: &View{
Round: big.NewInt(0),
Sequence: proposal.Number(),
},
Digest: proposal,
}
testCases := []struct {
system *testSystem
expectedErr error
}{
{
// normal case
func() *testSystem {
sys := NewTestSystemWithBackend(N, F)
for i, backend := range sys.backends {
c := backend.engine.(*core)
c.valSet = backend.peers
c.current = newTestRoundState(
&View{
Round: big.NewInt(0),
Sequence: big.NewInt(1),
},
c.valSet,
)
if i == 0 {
// replica 0 is the proposer
c.state = StatePrepared
}
}
return sys
}(),
nil,
},
{
// future message
func() *testSystem {
sys := NewTestSystemWithBackend(N, F)
for i, backend := range sys.backends {
c := backend.engine.(*core)
c.valSet = backend.peers
if i == 0 {
// replica 0 is the proposer
c.current = newTestRoundState(
expectedSubject.View,
c.valSet,
)
c.state = StatePreprepared
} else {
c.current = newTestRoundState(
&View{
Round: big.NewInt(2),
Sequence: big.NewInt(3),
},
c.valSet,
)
}
}
return sys
}(),
errFutureMessage,
},
{
// subject not match
func() *testSystem {
sys := NewTestSystemWithBackend(N, F)
for i, backend := range sys.backends {
c := backend.engine.(*core)
c.valSet = backend.peers
if i == 0 {
// replica 0 is the proposer
c.current = newTestRoundState(
expectedSubject.View,
c.valSet,
)
c.state = StatePreprepared
} else {
c.current = newTestRoundState(
&View{
Round: big.NewInt(0),
Sequence: big.NewInt(0),
},
c.valSet,
)
}
}
return sys
}(),
errOldMessage,
},
{
// jump state
func() *testSystem {
sys := NewTestSystemWithBackend(N, F)
for i, backend := range sys.backends {
c := backend.engine.(*core)
c.valSet = backend.peers
c.current = newTestRoundState(
&View{
Round: big.NewInt(0),
Sequence: proposal.Number(),
},
c.valSet,
)
// only replica0 stays at StatePreprepared
// other replicas are at StatePrepared
if i != 0 {
c.state = StatePrepared
} else {
c.state = StatePreprepared
}
}
return sys
}(),
nil,
},
// TODO: double send message
}
OUTER:
for _, test := range testCases {
test.system.Run(false)
v0 := test.system.backends[0]
r0 := v0.engine.(*core)
for i, v := range test.system.backends {
validator := r0.valSet.GetByIndex(uint64(i))
m, _ := Encode(v.engine.(*core).current.Subject())
if err := r0.handleCommit(&message{
Code: msgCommit,
Msg: m,
Address: validator.Address(),
Signature: []byte{},
CommittedSeal: validator.Address().Bytes(), // small hack
}, validator); err != nil {
if err != test.expectedErr {
t.Errorf("error mismatch: have %v, want %v", err, test.expectedErr)
}
continue OUTER
}
}
// prepared is normal case
if r0.state != StateCommitted {
// There are not enough commit messages in core
if r0.state != StatePrepared {
t.Errorf("state mismatch: have %v, want %v", r0.state, StatePrepared)
}
if r0.current.Commits.Size() >= r0.QuorumSize() {
t.Errorf("the size of commit messages should be less than %v", r0.QuorumSize())
}
continue
}
// core should have 2F+1 before Ceil2Nby3Block or Ceil(2N/3) prepare messages
if r0.current.Commits.Size() < r0.QuorumSize() {
t.Errorf("the size of commit messages should be larger than 2F+1 or Ceil(2N/3): size %v", r0.QuorumSize())
}
// check signatures large than F
signedCount := 0
committedSeals := v0.committedMsgs[0].committedSeals
for _, validator := range r0.valSet.List() {
for _, seal := range committedSeals {
if bytes.Compare(validator.Address().Bytes(), seal[:common.AddressLength]) == 0 {
signedCount++
break
}
}
}
if signedCount <= r0.valSet.F() {
t.Errorf("the expected signed count should be larger than %v, but got %v", r0.valSet.F(), signedCount)
}
}
}
// round is not checked for now
func TestVerifyCommit(t *testing.T) {
// for log purpose
privateKey, _ := crypto.GenerateKey()
peer := validator.New(getPublicKeyAddress(privateKey))
valSet := validator.NewSet([]common.Address{peer.Address()}, istanbul.RoundRobin)
sys := NewTestSystemWithBackend(uint64(1), uint64(0))
testCases := []struct {
expected error
commit *Subject
roundState *roundState
}{
{
// normal case
expected: nil,
commit: &Subject{
View: &View{Round: big.NewInt(0), Sequence: big.NewInt(0)},
Digest: newTestProposal(),
},
roundState: newTestRoundState(
&View{Round: big.NewInt(0), Sequence: big.NewInt(0)},
valSet,
),
},
{
// old message
expected: errInconsistentSubject,
commit: &Subject{
View: &View{Round: big.NewInt(0), Sequence: big.NewInt(0)},
Digest: newTestProposal(),
},
roundState: newTestRoundState(
&View{Round: big.NewInt(1), Sequence: big.NewInt(1)},
valSet,
),
},
{
// different digest
expected: errInconsistentSubject,
commit: &Subject{
View: &View{Round: big.NewInt(0), Sequence: big.NewInt(0)},
Digest: makeBlock(5),
},
roundState: newTestRoundState(
&View{Round: big.NewInt(1), Sequence: big.NewInt(1)},
valSet,
),
},
{
// malicious package(lack of sequence)
expected: errInconsistentSubject,
commit: &Subject{
View: &View{Round: big.NewInt(0), Sequence: nil},
Digest: newTestProposal(),
},
roundState: newTestRoundState(
&View{Round: big.NewInt(1), Sequence: big.NewInt(1)},
valSet,
),
},
{
// wrong prepare message with same sequence but different round
expected: errInconsistentSubject,
commit: &Subject{
View: &View{Round: big.NewInt(1), Sequence: big.NewInt(0)},
Digest: newTestProposal(),
},
roundState: newTestRoundState(
&View{Round: big.NewInt(0), Sequence: big.NewInt(0)},
valSet,
),
},
{
// wrong prepare message with same round but different sequence
expected: errInconsistentSubject,
commit: &Subject{
View: &View{Round: big.NewInt(0), Sequence: big.NewInt(1)},
Digest: newTestProposal(),
},
roundState: newTestRoundState(
&View{Round: big.NewInt(0), Sequence: big.NewInt(0)},
valSet,
),
},
}
for i, test := range testCases {
c := sys.backends[0].engine.(*core)
c.current = test.roundState
if err := c.verifyCommit(test.commit, peer); err != nil {
if err != test.expected {
t.Errorf("result %d: error mismatch: have %v, want %v", i, err, test.expected)
}
}
}
}

View File

@ -0,0 +1,369 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"bytes"
"math"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
metrics "github.com/ethereum/go-ethereum/metrics"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
// New creates an Istanbul consensus core
func New(backend istanbul.Backend, config *istanbul.Config) Engine {
r := metrics.NewRegistry()
c := &core{
config: config,
address: backend.Address(),
state: StateAcceptRequest,
handlerWg: new(sync.WaitGroup),
logger: log.New("address", backend.Address()),
backend: backend,
backlogs: make(map[common.Address]*prque.Prque),
backlogsMu: new(sync.Mutex),
pendingRequests: prque.New(),
pendingRequestsMu: new(sync.Mutex),
consensusTimestamp: time.Time{},
roundMeter: metrics.NewMeter(),
sequenceMeter: metrics.NewMeter(),
consensusTimer: metrics.NewTimer(),
}
// During upgrades we do not want to re-register round, sequence and consensus metrics
// Do not have to register them, if they are already registered
if roundMeter := r.Get("consensus/istanbul/core/round"); roundMeter == nil {
r.Register("consensus/istanbul/core/round", c.roundMeter)
}
if sequenceMeter := r.Get("consensus/istanbul/core/sequence"); sequenceMeter == nil {
r.Register("consensus/istanbul/core/sequence", c.sequenceMeter)
}
if consensusTimer := r.Get("consensus/istanbul/core/consensus"); consensusTimer == nil {
r.Register("consensus/istanbul/core/consensus", c.consensusTimer)
}
c.validateFn = c.checkValidatorSignature
return c
}
// ----------------------------------------------------------------------------
type core struct {
config *istanbul.Config
address common.Address
state State
logger log.Logger
backend istanbul.Backend
events *event.TypeMuxSubscription
finalCommittedSub *event.TypeMuxSubscription
timeoutSub *event.TypeMuxSubscription
futurePreprepareTimer *time.Timer
valSet istanbul.ValidatorSet
validateFn func([]byte, []byte) (common.Address, error)
backlogs map[common.Address]*prque.Prque
backlogsMu *sync.Mutex
current *roundState
handlerWg *sync.WaitGroup
roundChangeSet *roundChangeSet
roundChangeTimer *time.Timer
PreparedRoundPrepares *messageSet
pendingRequests *prque.Prque
pendingRequestsMu *sync.Mutex
consensusTimestamp time.Time
// the meter to record the round change rate
roundMeter metrics.Meter
// the meter to record the sequence update rate
sequenceMeter metrics.Meter
// the timer to record consensus duration (from accepting a preprepare to final committed stage)
consensusTimer metrics.Timer
}
func (c *core) finalizeMessage(msg *message) ([]byte, error) {
var err error
// Add sender address
msg.Address = c.Address()
// Add proof of consensus
msg.CommittedSeal = []byte{}
// Assign the CommittedSeal if it's a COMMIT message and proposal is not nil
if msg.Code == msgCommit && c.current.Proposal() != nil {
seal := PrepareCommittedSeal(c.current.Proposal().Hash())
msg.CommittedSeal, err = c.backend.Sign(seal)
if err != nil {
return nil, err
}
}
// Sign message
data, err := msg.PayloadNoSig()
if err != nil {
return nil, err
}
msg.Signature, err = c.backend.Sign(data)
if err != nil {
return nil, err
}
// Convert to payload
payload, err := msg.Payload()
if err != nil {
return nil, err
}
return payload, nil
}
func (c *core) broadcast(msg *message) {
logger := c.logger.New("state", c.state)
payload, err := c.finalizeMessage(msg)
if err != nil {
logger.Error("Failed to finalize message", "msg", msg, "err", err)
return
}
// Broadcast payload
if err = c.backend.Broadcast(c.valSet, payload); err != nil {
logger.Error("Failed to broadcast message", "msg", msg, "err", err)
return
}
}
func (c *core) currentView() *View {
return &View{
Sequence: new(big.Int).Set(c.current.Sequence()),
Round: new(big.Int).Set(c.current.Round()),
}
}
func (c *core) IsProposer() bool {
v := c.valSet
if v == nil {
return false
}
return v.IsProposer(c.backend.Address())
}
func (c *core) IsCurrentProposal(blockHash common.Hash) bool {
return c.current != nil && c.current.pendingRequest != nil && c.current.pendingRequest.Proposal.Hash() == blockHash
}
func (c *core) commit() {
c.setState(StateCommitted)
proposal := c.current.Proposal()
if proposal != nil {
committedSeals := make([][]byte, c.current.Commits.Size())
for i, v := range c.current.Commits.Values() {
committedSeals[i] = make([]byte, types.IstanbulExtraSeal)
copy(committedSeals[i][:], v.CommittedSeal[:])
}
if err := c.backend.Commit(proposal, committedSeals); err != nil {
c.sendNextRoundChange()
return
}
}
}
// startNewRound starts a new round. if round equals to 0, it means to starts a new sequence
func (c *core) startNewRound(round *big.Int) {
var logger log.Logger
if c.current == nil {
logger = c.logger.New("old_round", -1, "old_seq", 0)
} else {
logger = c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence())
}
logger.Trace("Start new qibft round")
roundChange := false
// Try to get last proposal
lastProposal, lastProposer := c.backend.LastProposal()
if c.current == nil {
logger.Trace("Start to the initial round")
} else if lastProposal.Number().Cmp(c.current.Sequence()) >= 0 {
diff := new(big.Int).Sub(lastProposal.Number(), c.current.Sequence())
c.sequenceMeter.Mark(new(big.Int).Add(diff, common.Big1).Int64())
if !c.consensusTimestamp.IsZero() {
c.consensusTimer.UpdateSince(c.consensusTimestamp)
c.consensusTimestamp = time.Time{}
}
logger.Trace("Catch up latest proposal", "number", lastProposal.Number().Uint64(), "hash", lastProposal.Hash())
} else if lastProposal.Number().Cmp(big.NewInt(c.current.Sequence().Int64()-1)) == 0 {
if round.Cmp(common.Big0) == 0 {
// same seq and round, don't need to start new round
return
} else if round.Cmp(c.current.Round()) < 0 {
logger.Warn("New round should not be smaller than current round", "seq", lastProposal.Number().Int64(), "new_round", round, "old_round", c.current.Round())
return
}
roundChange = true
} else {
logger.Warn("New sequence should be larger than current sequence", "new_seq", lastProposal.Number().Int64())
return
}
var newView *View
if roundChange {
newView = &View{
Sequence: new(big.Int).Set(c.current.Sequence()),
Round: new(big.Int).Set(round),
}
} else {
newView = &View{
Sequence: new(big.Int).Add(lastProposal.Number(), common.Big1),
Round: new(big.Int),
}
c.valSet = c.backend.Validators(lastProposal)
}
// Update logger
logger = logger.New("old_proposer", c.valSet.GetProposer())
// New snapshot for new round
c.updateRoundState(newView, c.valSet, roundChange)
// Calculate new proposer
c.valSet.CalcProposer(lastProposer, newView.Round.Uint64())
c.setState(StateAcceptRequest)
if round.Cmp(c.current.Round()) > 0 {
c.roundMeter.Mark(new(big.Int).Sub(round, c.current.Round()).Int64())
}
// Update RoundChangeSet by deleting older round messages
if round.Uint64() == 0 {
c.PreparedRoundPrepares = nil
c.roundChangeSet = newRoundChangeSet(c.valSet)
} else {
// Clear earlier round messages
c.roundChangeSet.ClearLowerThan(round)
}
c.newRoundChangeTimer()
logger.Debug("New round", "new_round", newView.Round, "new_seq", newView.Sequence, "new_proposer", c.valSet.GetProposer(), "valSet", c.valSet.List(), "size", c.valSet.Size(), "IsProposer", c.IsProposer())
}
// updateRoundState updates round state by checking if locking block is necessary
func (c *core) updateRoundState(view *View, validatorSet istanbul.ValidatorSet, roundChange bool) {
if roundChange && c.current != nil {
c.current = newRoundState(view, validatorSet, c.current.Preprepare, c.current.pendingRequest, c.backend.HasBadProposal)
} else {
c.current = newRoundState(view, validatorSet, nil, nil, c.backend.HasBadProposal)
}
}
func (c *core) setState(state State) {
if c.state != state {
c.state = state
}
if state == StateAcceptRequest {
c.processPendingRequests()
}
c.processBacklog()
}
func (c *core) Address() common.Address {
return c.address
}
func (c *core) stopFuturePreprepareTimer() {
if c.futurePreprepareTimer != nil {
c.futurePreprepareTimer.Stop()
}
}
func (c *core) stopTimer() {
c.stopFuturePreprepareTimer()
if c.roundChangeTimer != nil {
c.roundChangeTimer.Stop()
}
}
func (c *core) newRoundChangeTimer() {
c.stopTimer()
// set timeout based on the round number
timeout := time.Duration(c.config.RequestTimeout) * time.Millisecond
round := c.current.Round().Uint64()
if round > 0 {
timeout += time.Duration(math.Pow(2, float64(round))) * time.Second
}
c.roundChangeTimer = time.AfterFunc(timeout, func() {
c.sendEvent(timeoutEvent{})
})
}
func (c *core) checkValidatorSignature(data []byte, sig []byte) (common.Address, error) {
return istanbul.CheckValidatorSignature(c.valSet, data, sig)
}
func (c *core) QuorumSize() int {
if c.config.Ceil2Nby3Block == nil || (c.current != nil && c.current.sequence.Cmp(c.config.Ceil2Nby3Block) < 0) {
c.logger.Trace("Confirmation Formula used 2F+ 1")
return (2 * c.valSet.F()) + 1
}
c.logger.Trace("Confirmation Formula used ceil(2N/3)")
return int(math.Ceil(float64(2*c.valSet.Size()) / 3))
}
// PrepareCommittedSeal returns a committed seal for the given hash
func PrepareCommittedSeal(hash common.Hash) []byte {
var buf bytes.Buffer
buf.Write(hash.Bytes())
buf.Write([]byte{byte(msgCommit)})
return buf.Bytes()
}
// validateMsgSignature verifies if the message has a valid signature
func validateMsgSignature(messages map[common.Address]*message, validateFn func([]byte, []byte) (common.Address, error)) error {
for _, msg := range messages {
var payload []byte
payload, err := msg.PayloadNoSig()
if err != nil {
return err
}
signerAdd, err := validateFn(payload, msg.Signature)
if err != nil {
return err
}
if bytes.Compare(signerAdd.Bytes(), msg.Address.Bytes()) != 0 {
return errInvalidSigner
}
}
return nil
}

View File

@ -0,0 +1,96 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"math/big"
"reflect"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/core/types"
elog "github.com/ethereum/go-ethereum/log"
)
func makeBlock(number int64) *types.Block {
header := &types.Header{
Difficulty: big.NewInt(0),
Number: big.NewInt(number),
GasLimit: 0,
GasUsed: 0,
Time: 0,
}
block := &types.Block{}
return block.WithSeal(header)
}
func newTestProposal() istanbul.Proposal {
return makeBlock(1)
}
func TestNewRequest(t *testing.T) {
testLogger.SetHandler(elog.StdoutHandler)
N := uint64(4)
F := uint64(1)
sys := NewTestSystemWithBackend(N, F)
close := sys.Run(true)
defer close()
request1 := makeBlock(1)
sys.backends[0].NewRequest(request1)
<-time.After(1 * time.Second)
request2 := makeBlock(2)
sys.backends[0].NewRequest(request2)
<-time.After(1 * time.Second)
for _, backend := range sys.backends {
if len(backend.committedMsgs) != 2 {
t.Errorf("the number of executed requests mismatch: have %v, want 2", len(backend.committedMsgs))
}
if !reflect.DeepEqual(request1.Number(), backend.committedMsgs[0].commitProposal.Number()) {
t.Errorf("the number of requests mismatch: have %v, want %v", request1.Number(), backend.committedMsgs[0].commitProposal.Number())
}
if !reflect.DeepEqual(request2.Number(), backend.committedMsgs[1].commitProposal.Number()) {
t.Errorf("the number of requests mismatch: have %v, want %v", request2.Number(), backend.committedMsgs[1].commitProposal.Number())
}
}
}
func TestQuorumSize(t *testing.T) {
N := uint64(4)
F := uint64(1)
sys := NewTestSystemWithBackend(N, F)
backend := sys.backends[0]
c := backend.engine.(*core)
valSet := c.valSet
for i := 1; i <= 1000; i++ {
valSet.AddValidator(common.StringToAddress(string(i)))
if 2*c.QuorumSize() <= (valSet.Size()+valSet.F()) || 2*c.QuorumSize() > (valSet.Size()+valSet.F()+2) {
t.Errorf("quorumSize constraint failed, expected value (2*QuorumSize > Size+F && 2*QuorumSize <= Size+F+2) to be:%v, got: %v, for size: %v", true, false, valSet.Size())
}
}
}

View File

@ -0,0 +1,46 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import "errors"
var (
// errInconsistentSubject is returned when received subject is different from
// current subject.
errInconsistentSubject = errors.New("inconsistent subjects")
// errNotFromProposer is returned when received message is supposed to be from
// proposer.
errNotFromProposer = errors.New("message does not come from proposer")
// errFutureMessage is returned when current view is earlier than the
// view of the received message.
errFutureMessage = errors.New("future message")
// errOldMessage is returned when the received message's view is earlier
// than current view.
errOldMessage = errors.New("old message")
// errInvalidMessage is returned when the message is malformed.
errInvalidMessage = errors.New("invalid message")
// errFailedDecodePreprepare is returned when the PRE-PREPARE message is malformed.
errFailedDecodePreprepare = errors.New("failed to decode PRE-PREPARE")
// errFailedDecodePrepare is returned when the PREPARE message is malformed.
errFailedDecodePrepare = errors.New("failed to decode PREPARE")
// errFailedDecodeCommit is returned when the COMMIT message is malformed.
errFailedDecodeCommit = errors.New("failed to decode COMMIT")
// errInvalidSigner is returned when the message is signed by a validator different than message sender
errInvalidSigner = errors.New("message not signed by the sender")
// errInvalidPreparedBlock is returned when prepared block is not validated in round change messages
errInvalidPreparedBlock = errors.New("invalid prepared block in round change messages")
)

View File

@ -0,0 +1,28 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"github.com/ethereum/go-ethereum/consensus/istanbul"
)
type backlogEvent struct {
src istanbul.Validator
msg *message
}
type timeoutEvent struct{}

View File

@ -0,0 +1,26 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import "github.com/ethereum/go-ethereum/common"
func (c *core) handleFinalCommitted() error {
logger := c.logger.New("state", c.state)
logger.Trace("Received a final committed proposal")
c.startNewRound(common.Big0)
return nil
}

View File

@ -0,0 +1,195 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)
// Start implements core.Engine.Start
func (c *core) Start() error {
// Start a new round from last sequence + 1
c.startNewRound(common.Big0)
// Tests will handle events itself, so we have to make subscribeEvents()
// be able to call in test.
c.subscribeEvents()
go c.handleEvents()
return nil
}
// Stop implements core.Engine.Stop
func (c *core) Stop() error {
c.stopTimer()
c.unsubscribeEvents()
// Make sure the handler goroutine exits
c.handlerWg.Wait()
return nil
}
// ----------------------------------------------------------------------------
// Subscribe both internal and external events
func (c *core) subscribeEvents() {
c.events = c.backend.EventMux().Subscribe(
// external events
istanbul.RequestEvent{},
istanbul.MessageEvent{},
// internal events
backlogEvent{},
)
c.timeoutSub = c.backend.EventMux().Subscribe(
timeoutEvent{},
)
c.finalCommittedSub = c.backend.EventMux().Subscribe(
istanbul.FinalCommittedEvent{},
)
}
// Unsubscribe all events
func (c *core) unsubscribeEvents() {
c.events.Unsubscribe()
c.timeoutSub.Unsubscribe()
c.finalCommittedSub.Unsubscribe()
}
func (c *core) handleEvents() {
// Clear state
defer func() {
c.current = nil
c.handlerWg.Done()
}()
c.handlerWg.Add(1)
for {
select {
case event, ok := <-c.events.Chan():
if !ok {
return
}
// A real event arrived, process interesting content
switch ev := event.Data.(type) {
case istanbul.RequestEvent:
r := &Request{
Proposal: ev.Proposal,
}
err := c.handleRequest(r)
if err == errFutureMessage {
c.storeRequestMsg(r)
}
case istanbul.MessageEvent:
if err := c.handleMsg(ev.Payload); err == nil {
c.backend.Gossip(c.valSet, ev.Payload)
}
case backlogEvent:
// No need to check signature for internal messages
if err := c.handleCheckedMsg(ev.msg, ev.src); err == nil {
p, err := ev.msg.Payload()
if err != nil {
c.logger.Warn("Get message payload failed", "err", err)
continue
}
c.backend.Gossip(c.valSet, p)
}
}
case _, ok := <-c.timeoutSub.Chan():
if !ok {
return
}
c.handleTimeoutMsg()
case event, ok := <-c.finalCommittedSub.Chan():
if !ok {
return
}
switch event.Data.(type) {
case istanbul.FinalCommittedEvent:
c.handleFinalCommitted()
}
}
}
}
// sendEvent sends events to mux
func (c *core) sendEvent(ev interface{}) {
c.backend.EventMux().Post(ev)
}
func (c *core) handleMsg(payload []byte) error {
logger := c.logger.New()
// Decode message and check its signature
msg := new(message)
if err := msg.FromPayload(payload, c.validateFn); err != nil {
logger.Error("Failed to decode message from payload", "err", err)
return err
}
// Only accept message if the address is valid
_, src := c.valSet.GetByAddress(msg.Address)
if src == nil {
logger.Error("Invalid address in message", "msg", msg)
return istanbul.ErrUnauthorizedAddress
}
return c.handleCheckedMsg(msg, src)
}
func (c *core) handleCheckedMsg(msg *message, src istanbul.Validator) error {
logger := c.logger.New("address", c.address, "from", src)
// Store the message if it's a future message
testBacklog := func(err error) error {
if err == errFutureMessage {
c.storeBacklog(msg, src)
}
return err
}
switch msg.Code {
case msgPreprepare:
return testBacklog(c.handlePreprepare(msg, src))
case msgPrepare:
return testBacklog(c.handlePrepare(msg, src))
case msgCommit:
return testBacklog(c.handleCommit(msg, src))
case msgRoundChange:
return testBacklog(c.handleRoundChange(msg, src))
default:
logger.Error("Invalid message", "msg", msg)
}
return errInvalidMessage
}
func (c *core) handleTimeoutMsg() {
// Start the new round
round := c.current.Round()
nextRound := new(big.Int).Add(round, common.Big1)
c.startNewRound(nextRound)
// Start the timer
c.newRoundChangeTimer()
// Send Round Change
c.sendRoundChange(nextRound)
}

View File

@ -0,0 +1,127 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"math/big"
"testing"
)
// notice: the normal case have been tested in integration tests.
func TestHandleMsg(t *testing.T) {
N := uint64(4)
F := uint64(1)
sys := NewTestSystemWithBackend(N, F)
closer := sys.Run(true)
defer closer()
v0 := sys.backends[0]
r0 := v0.engine.(*core)
m, _ := Encode(&RoundChangeMessage{
View: &View{
Sequence: big.NewInt(0),
Round: big.NewInt(0),
},
PreparedRound: big.NewInt(0),
PreparedBlock: makeBlock(5),
})
// with a matched payload. msgPreprepare should match with *istanbul.Preprepare in normal case.
msg := &message{
Code: msgPreprepare,
Msg: m,
Address: v0.Address(),
Signature: []byte{},
CommittedSeal: []byte{},
}
_, val := v0.Validators(nil).GetByAddress(v0.Address())
if err := r0.handleCheckedMsg(msg, val); err != errFailedDecodePreprepare {
t.Errorf("error mismatch: have %v, want %v", err, errFailedDecodePreprepare)
}
m, _ = Encode(&RoundChangeMessage{
View: &View{
Sequence: big.NewInt(0),
Round: big.NewInt(0),
},
PreparedRound: big.NewInt(0),
PreparedBlock: makeBlock(1),
})
// with a unmatched payload. msgPrepare should match with *istanbul.Subject in normal case.
msg = &message{
Code: msgPrepare,
Msg: m,
Address: v0.Address(),
Signature: []byte{},
CommittedSeal: []byte{},
}
_, val = v0.Validators(nil).GetByAddress(v0.Address())
if err := r0.handleCheckedMsg(msg, val); err != errFailedDecodePrepare {
t.Errorf("error mismatch: have %v, want %v", err, errFailedDecodePreprepare)
}
m, _ = Encode(&RoundChangeMessage{
View: &View{
Sequence: big.NewInt(0),
Round: big.NewInt(0),
},
PreparedRound: big.NewInt(0),
PreparedBlock: makeBlock(5),
})
// with a unmatched payload. istanbul.MsgCommit should match with *istanbul.Subject in normal case.
msg = &message{
Code: msgCommit,
Msg: m,
Address: v0.Address(),
Signature: []byte{},
CommittedSeal: []byte{},
}
_, val = v0.Validators(nil).GetByAddress(v0.Address())
if err := r0.handleCheckedMsg(msg, val); err != errFailedDecodeCommit {
t.Errorf("error mismatch: have %v, want %v", err, errFailedDecodeCommit)
}
m, _ = Encode(&Preprepare{
View: &View{
Sequence: big.NewInt(0),
Round: big.NewInt(0),
},
Proposal: makeBlock(3),
})
// invalid message code. message code is not exists in list
msg = &message{
Code: uint64(99),
Msg: m,
Address: v0.Address(),
Signature: []byte{},
CommittedSeal: []byte{},
}
_, val = v0.Validators(nil).GetByAddress(v0.Address())
if err := r0.handleCheckedMsg(msg, val); err == nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
// with malicious payload
if err := r0.handleMsg([]byte{1}); err == nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
}

View File

@ -0,0 +1,180 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"fmt"
"io"
"math/big"
"strings"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/rlp"
)
// Construct a new message set to accumulate messages for given sequence/view number.
func newMessageSet(valSet istanbul.ValidatorSet) *messageSet {
return &messageSet{
view: &View{
Round: new(big.Int),
Sequence: new(big.Int),
},
messagesMu: new(sync.Mutex),
messages: make(map[common.Address]*message),
valSet: valSet,
}
}
// ----------------------------------------------------------------------------
type messageSet struct {
view *View
valSet istanbul.ValidatorSet
messagesMu *sync.Mutex
messages map[common.Address]*message
}
// messageMapAsStruct is a temporary holder struct to convert messages map to a slice when Encoding and Decoding messageSet
type messageMapAsStruct struct {
Address common.Address
Msg *message
}
func (ms *messageSet) View() *View {
return ms.view
}
func (ms *messageSet) Add(msg *message) error {
ms.messagesMu.Lock()
defer ms.messagesMu.Unlock()
if err := ms.verify(msg); err != nil {
return err
}
return ms.addVerifiedMessage(msg)
}
func (ms *messageSet) Values() (result []*message) {
ms.messagesMu.Lock()
defer ms.messagesMu.Unlock()
for _, v := range ms.messages {
result = append(result, v)
}
return result
}
func (ms *messageSet) Size() int {
ms.messagesMu.Lock()
defer ms.messagesMu.Unlock()
return len(ms.messages)
}
func (ms *messageSet) Get(addr common.Address) *message {
ms.messagesMu.Lock()
defer ms.messagesMu.Unlock()
return ms.messages[addr]
}
// ----------------------------------------------------------------------------
func (ms *messageSet) verify(msg *message) error {
// verify if the message comes from one of the validators
if _, v := ms.valSet.GetByAddress(msg.Address); v == nil {
return istanbul.ErrUnauthorizedAddress
}
// TODO: check view number and sequence number
return nil
}
func (ms *messageSet) addVerifiedMessage(msg *message) error {
ms.messages[msg.Address] = msg
return nil
}
func (ms *messageSet) String() string {
ms.messagesMu.Lock()
defer ms.messagesMu.Unlock()
addresses := make([]string, 0, len(ms.messages))
for _, v := range ms.messages {
addresses = append(addresses, v.Address.String())
}
return fmt.Sprintf("[%v]", strings.Join(addresses, ", "))
}
// EncodeRLP serializes messageSet into Ethereum RLP format
// valSet is currently not being encoded.
func (ms *messageSet) EncodeRLP(w io.Writer) error {
if ms == nil {
return nil
}
ms.messagesMu.Lock()
defer ms.messagesMu.Unlock()
// maps cannot be RLP encoded, convert the map into a slice of struct and then encode it
var messagesAsSlice []messageMapAsStruct
for k, v := range ms.messages {
msgMapAsStruct := messageMapAsStruct{
Address: k,
Msg: v,
}
messagesAsSlice = append(messagesAsSlice, msgMapAsStruct)
}
return rlp.Encode(w, []interface{}{
ms.view,
// ms.valSet,
messagesAsSlice,
})
}
// DecodeRLP deserializes rlp stream into messageSet
// valSet is currently not being decoded
func (ms *messageSet) DecodeRLP(stream *rlp.Stream) error {
// Don't decode messageSet if the size of the stream is 0
_, size, _ := stream.Kind()
if size == 0 {
return nil
}
var msgSet struct {
MsgView *View
// valSet istanbul.ValidatorSet
MessagesSlice []messageMapAsStruct
}
if err := stream.Decode(&msgSet); err != nil {
return err
}
// convert the messages struct slice back to map
messages := make(map[common.Address]*message)
for _, msgStruct := range msgSet.MessagesSlice {
messages[msgStruct.Address] = msgStruct.Msg
}
ms.view = msgSet.MsgView
// ms.valSet = msgSet.valSet
ms.messages = messages
ms.messagesMu = new(sync.Mutex)
return nil
}

View File

@ -0,0 +1,179 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"math/big"
"testing"
"github.com/ethereum/go-ethereum/rlp"
)
func TestMessageSetWithPreprepare(t *testing.T) {
valSet := newTestValidatorSet(4)
ms := newMessageSet(valSet)
view := &View{
Round: new(big.Int),
Sequence: new(big.Int),
}
pp := &Preprepare{
View: view,
Proposal: makeBlock(1),
}
rawPP, err := rlp.EncodeToBytes(pp)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
msg := &message{
Code: msgPreprepare,
Msg: rawPP,
Address: valSet.GetProposer().Address(),
}
err = ms.Add(msg)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
err = ms.Add(msg)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
if ms.Size() != 1 {
t.Errorf("the size of message set mismatch: have %v, want 1", ms.Size())
}
}
func TestMessageSetWithSubject(t *testing.T) {
valSet := newTestValidatorSet(4)
ms := newMessageSet(valSet)
view := &View{
Round: new(big.Int),
Sequence: new(big.Int),
}
sub := &Subject{
View: view,
Digest: makeBlock(5),
}
rawSub, err := rlp.EncodeToBytes(sub)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
msg := &message{
Code: msgPrepare,
Msg: rawSub,
Address: valSet.GetProposer().Address(),
}
err = ms.Add(msg)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
err = ms.Add(msg)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
if ms.Size() != 1 {
t.Errorf("the size of message set mismatch: have %v, want 1", ms.Size())
}
}
// TestMessageSetEncodeDecode tests RLP encoding and decoding of messageSet, it does so by
// first encoding a RoundChangeMessage and then encoding a messageSet.
// It verifies encoding by decoding these messages and asserting the decoded values
func TestMessageSetEncodeDecode(t *testing.T) {
valSet := newTestValidatorSet(4)
ms := newMessageSet(valSet)
proposal := makeBlock(5)
view := &View{
Round: big.NewInt(0),
Sequence: big.NewInt(5),
}
ms.view = view
rc := &RoundChangeMessage{
View: view,
PreparedRound: big.NewInt(0),
PreparedBlock: proposal,
}
encodedRC, err := rlp.EncodeToBytes(rc)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
firstVal := valSet.List()[0]
msg := &message{
Code: 0,
Msg: encodedRC,
Address: firstVal.Address(),
Signature: []byte{},
CommittedSeal: []byte{},
}
ms.messages[firstVal.Address()] = msg
encodedMS, err := Encode(ms)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
encodedMessages := &message{
Code: msgRoundChange,
Msg: encodedMS,
}
var decodedMsgSet *messageSet
err = encodedMessages.Decode(&decodedMsgSet)
if err != nil {
t.Errorf("failed to decode messageSet: %v", err)
}
decodedMsg := decodedMsgSet.messages[firstVal.Address()]
if decodedMsg.Address != firstVal.Address() {
t.Errorf("messageset mismatch: have %v, want %v", decodedMsg.Address, firstVal.Address())
}
encodedRCMsg := &message{
Code: msgRoundChange,
Msg: decodedMsg.Msg,
Address: firstVal.Address(),
}
var rcMsg *RoundChangeMessage
err = encodedRCMsg.Decode(&rcMsg)
if rcMsg.PreparedBlock.Hash() != rc.PreparedBlock.Hash() {
t.Errorf("rc message mismatch: have %v, want %v", rcMsg.PreparedBlock.Hash(), rc.PreparedBlock.Hash())
}
}

View File

@ -0,0 +1,99 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"reflect"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)
func (c *core) sendPrepare() {
logger := c.logger.New("state", c.state)
sub := c.current.Subject()
encodedSubject, err := Encode(sub)
if err != nil {
logger.Error("Failed to encode", "subject", sub)
return
}
c.broadcast(&message{
Code: msgPrepare,
Msg: encodedSubject,
})
}
func (c *core) handlePrepare(msg *message, src istanbul.Validator) error {
// Decode PREPARE message
var prepare *Subject
err := msg.Decode(&prepare)
if err != nil {
return errFailedDecodePrepare
}
if err := c.checkMessage(msgPrepare, prepare.View); err != nil {
return err
}
// If it is locked, it can only process on the locked block.
// Passing verifyPrepare and checkMessage implies it is processing on the locked block since it was verified in the Preprepared state.
if err := c.verifyPrepare(prepare, src); err != nil {
return err
}
c.acceptPrepare(msg, src)
// Change to Prepared state if we've received enough PREPARE messages
// and we are in earlier state before Prepared state.
if (c.current.GetPrepareOrCommitSize() >= c.QuorumSize()) && c.state.Cmp(StatePrepared) < 0 {
// IBFT REDUX
c.current.preparedRound = c.currentView().Round
c.current.preparedBlock = prepare.Digest
c.PreparedRoundPrepares = c.current.Prepares
c.setState(StatePrepared)
c.sendCommit()
}
return nil
}
// verifyPrepare verifies if the received PREPARE message is equivalent to our subject
func (c *core) verifyPrepare(prepare *Subject, src istanbul.Validator) error {
logger := c.logger.New("from", src, "state", c.state)
sub := c.current.Subject()
if !reflect.DeepEqual(prepare.View, sub.View) || prepare.Digest.Hash().Hex() != sub.Digest.Hash().Hex() {
logger.Warn("Inconsistent subjects between PREPARE and proposal", "expected", sub, "got", prepare)
return errInconsistentSubject
}
return nil
}
func (c *core) acceptPrepare(msg *message, src istanbul.Validator) error {
logger := c.logger.New("from", src, "state", c.state)
// Add the PREPARE message to current round state
if err := c.current.Prepares.Add(msg); err != nil {
logger.Error("Failed to add PREPARE message to round state", "msg", msg, "err", err)
return err
}
return nil
}

View File

@ -0,0 +1,350 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"math"
"math/big"
"reflect"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
"github.com/ethereum/go-ethereum/crypto"
)
func TestHandlePrepare(t *testing.T) {
N := uint64(4)
F := uint64(1)
proposal := newTestProposal()
expectedSubject := &Subject{
View: &View{
Round: big.NewInt(0),
Sequence: proposal.Number(),
},
Digest: proposal,
}
testCases := []struct {
system *testSystem
expectedErr error
}{
{
// normal case
func() *testSystem {
sys := NewTestSystemWithBackend(N, F)
for i, backend := range sys.backends {
c := backend.engine.(*core)
c.valSet = backend.peers
c.current = newTestRoundState(
&View{
Round: big.NewInt(0),
Sequence: big.NewInt(1),
},
c.valSet,
)
if i == 0 {
// replica 0 is the proposer
c.state = StatePreprepared
}
}
return sys
}(),
nil,
},
{
// future message
func() *testSystem {
sys := NewTestSystemWithBackend(N, F)
for i, backend := range sys.backends {
c := backend.engine.(*core)
c.valSet = backend.peers
if i == 0 {
// replica 0 is the proposer
c.current = newTestRoundState(
expectedSubject.View,
c.valSet,
)
c.state = StatePreprepared
} else {
c.current = newTestRoundState(
&View{
Round: big.NewInt(2),
Sequence: big.NewInt(3),
},
c.valSet,
)
}
}
return sys
}(),
errFutureMessage,
},
{
// subject not match
func() *testSystem {
sys := NewTestSystemWithBackend(N, F)
for i, backend := range sys.backends {
c := backend.engine.(*core)
c.valSet = backend.peers
if i == 0 {
// replica 0 is the proposer
c.current = newTestRoundState(
expectedSubject.View,
c.valSet,
)
c.state = StatePreprepared
} else {
c.current = newTestRoundState(
&View{
Round: big.NewInt(0),
Sequence: big.NewInt(0),
},
c.valSet,
)
}
}
return sys
}(),
errOldMessage,
},
{
// subject not match
func() *testSystem {
sys := NewTestSystemWithBackend(N, F)
for i, backend := range sys.backends {
c := backend.engine.(*core)
c.valSet = backend.peers
if i == 0 {
// replica 0 is the proposer
c.current = newTestRoundState(
expectedSubject.View,
c.valSet,
)
c.state = StatePreprepared
} else {
c.current = newTestRoundState(
&View{
Round: big.NewInt(0),
Sequence: big.NewInt(1)},
c.valSet,
)
}
}
return sys
}(),
errInconsistentSubject,
},
{
func() *testSystem {
sys := NewTestSystemWithBackend(N, F)
// save less than Ceil(2*N/3) replica
sys.backends = sys.backends[int(math.Ceil(float64(2*N)/3)):]
for i, backend := range sys.backends {
c := backend.engine.(*core)
c.valSet = backend.peers
c.current = newTestRoundState(
expectedSubject.View,
c.valSet,
)
if i == 0 {
// replica 0 is the proposer
c.state = StatePreprepared
}
}
return sys
}(),
nil,
},
// TODO: double send message
}
OUTER:
for _, test := range testCases {
test.system.Run(false)
v0 := test.system.backends[0]
r0 := v0.engine.(*core)
for i, v := range test.system.backends {
validator := r0.valSet.GetByIndex(uint64(i))
m, _ := Encode(v.engine.(*core).current.Subject())
if err := r0.handlePrepare(&message{
Code: msgPrepare,
Msg: m,
Address: validator.Address(),
}, validator); err != nil {
if err != test.expectedErr {
t.Errorf("error mismatch: have %v, want %v", err, test.expectedErr)
}
continue OUTER
}
}
// prepared is normal case
if r0.state != StatePrepared {
// There are not enough PREPARE messages in core
if r0.state != StatePreprepared {
t.Errorf("state mismatch: have %v, want %v", r0.state, StatePreprepared)
}
if r0.current.Prepares.Size() >= r0.QuorumSize() {
t.Errorf("the size of PREPARE messages should be less than %v", r0.QuorumSize())
}
continue
}
// core should have 2F+1 before Ceil2Nby3Block and Ceil(2N/3) after Ceil2Nby3Block PREPARE messages
if r0.current.Prepares.Size() < r0.QuorumSize() {
t.Errorf("the size of PREPARE messages should be larger than 2F+1 or ceil(2N/3): size %v", r0.current.Commits.Size())
}
// a message will be delivered to backend if ceil(2N/3)
if int64(len(v0.sentMsgs)) != 1 {
t.Errorf("the Send() should be called once: times %v", len(test.system.backends[0].sentMsgs))
}
// verify COMMIT messages
decodedMsg := new(message)
err := decodedMsg.FromPayload(v0.sentMsgs[0], nil)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
if decodedMsg.Code != msgCommit {
t.Errorf("message code mismatch: have %v, want %v", decodedMsg.Code, msgCommit)
}
var m *Subject
err = decodedMsg.Decode(&m)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
if !reflect.DeepEqual(m.View, expectedSubject.View) || m.Digest.Hash().Hex() != expectedSubject.Digest.Hash().Hex() {
t.Errorf("subject mismatch: have %v, want %v", m, expectedSubject)
}
}
}
// round is not checked for now
func TestVerifyPrepare(t *testing.T) {
// for log purpose
privateKey, _ := crypto.GenerateKey()
peer := validator.New(getPublicKeyAddress(privateKey))
valSet := validator.NewSet([]common.Address{peer.Address()}, istanbul.RoundRobin)
sys := NewTestSystemWithBackend(uint64(1), uint64(0))
testCases := []struct {
expected error
prepare *Subject
roundState *roundState
}{
{
// normal case
expected: nil,
prepare: &Subject{
View: &View{Round: big.NewInt(0), Sequence: big.NewInt(0)},
Digest: newTestProposal(),
},
roundState: newTestRoundState(
&View{Round: big.NewInt(0), Sequence: big.NewInt(0)},
valSet,
),
},
{
// old message
expected: errInconsistentSubject,
prepare: &Subject{
View: &View{Round: big.NewInt(0), Sequence: big.NewInt(0)},
Digest: newTestProposal(),
},
roundState: newTestRoundState(
&View{Round: big.NewInt(1), Sequence: big.NewInt(1)},
valSet,
),
},
{
// different digest
expected: errInconsistentSubject,
prepare: &Subject{
View: &View{Round: big.NewInt(0), Sequence: big.NewInt(0)},
Digest: makeBlock(5),
},
roundState: newTestRoundState(
&View{Round: big.NewInt(1), Sequence: big.NewInt(1)},
valSet,
),
},
{
// malicious package(lack of sequence)
expected: errInconsistentSubject,
prepare: &Subject{
View: &View{Round: big.NewInt(0), Sequence: nil},
Digest: newTestProposal(),
},
roundState: newTestRoundState(
&View{Round: big.NewInt(1), Sequence: big.NewInt(1)},
valSet,
),
},
{
// wrong PREPARE message with same sequence but different round
expected: errInconsistentSubject,
prepare: &Subject{
View: &View{Round: big.NewInt(1), Sequence: big.NewInt(0)},
Digest: newTestProposal(),
},
roundState: newTestRoundState(
&View{Round: big.NewInt(0), Sequence: big.NewInt(0)},
valSet,
),
},
{
// wrong PREPARE message with same round but different sequence
expected: errInconsistentSubject,
prepare: &Subject{
View: &View{Round: big.NewInt(0), Sequence: big.NewInt(1)},
Digest: newTestProposal(),
},
roundState: newTestRoundState(
&View{Round: big.NewInt(0), Sequence: big.NewInt(0)},
valSet,
),
},
}
for i, test := range testCases {
c := sys.backends[0].engine.(*core)
c.current = test.roundState
if err := c.verifyPrepare(test.prepare, peer); err != nil {
if err != test.expected {
t.Errorf("result %d: error mismatch: have %v, want %v", i, err, test.expected)
}
}
}
}

View File

@ -0,0 +1,216 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"time"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)
func (c *core) sendPreprepare(request *Request) {
logger := c.logger.New("state", c.state)
rcMessages := request.RCMessages
if rcMessages == nil {
rcMessages = newMessageSet(c.valSet)
}
preparedMessages := request.PrepareMessages
if preparedMessages == nil {
preparedMessages = newMessageSet(c.valSet)
}
// If I'm the proposer and I have the same sequence with the proposal
if c.current.Sequence().Cmp(request.Proposal.Number()) == 0 && c.IsProposer() {
curView := c.currentView()
preprepare, err := Encode(&Preprepare{
View: curView,
Proposal: request.Proposal,
RCMessages: rcMessages,
PreparedMessages: preparedMessages,
})
if err != nil {
logger.Error("Failed to encode", "view", curView)
return
}
c.broadcast(&message{
Code: msgPreprepare,
Msg: preprepare,
})
}
}
func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {
logger := c.logger.New("from", src, "state", c.state)
// Decode PRE-PREPARE
var preprepare *Preprepare
err := msg.Decode(&preprepare)
if err != nil {
logger.Debug("Failed to decode preprepare message", "err", err)
return errFailedDecodePreprepare
}
// Ensure we have the same view with the PRE-PREPARE message
// If it is old message, see if we need to broadcast COMMIT
if err := c.checkMessage(msgPreprepare, preprepare.View); err != nil {
if err == errOldMessage {
// Get validator set for the given proposal
valSet := c.backend.ParentValidators(preprepare.Proposal).Copy()
previousProposer := c.backend.GetProposer(preprepare.Proposal.Number().Uint64() - 1)
valSet.CalcProposer(previousProposer, preprepare.View.Round.Uint64())
// Broadcast COMMIT if it is an existing block
// 1. The proposer needs to be a proposer matches the given (Sequence + Round)
// 2. The given block must exist
if valSet.IsProposer(src.Address()) && c.backend.HasPropsal(preprepare.Proposal.Hash(), preprepare.Proposal.Number()) {
c.sendCommitForOldBlock(preprepare.View, preprepare.Proposal)
return nil
}
}
return err
}
// Check if the message comes from current proposer
if !c.valSet.IsProposer(src.Address()) {
logger.Warn("Ignore preprepare messages from non-proposer")
return errNotFromProposer
}
if preprepare.View.Round.Uint64() > 0 && !c.validatePrepreparedMessage(preprepare, src) {
logger.Error("Unable to verify prepared block in Round Change messages")
return errInvalidPreparedBlock
}
// Verify the proposal we received
if duration, err := c.backend.Verify(preprepare.Proposal); err != nil {
// if it's a future block, we will handle it again after the duration
if err == consensus.ErrFutureBlock {
logger.Info("Proposed block will be handled in the future", "err", err, "duration", duration)
c.stopFuturePreprepareTimer()
c.futurePreprepareTimer = time.AfterFunc(duration, func() {
c.sendEvent(backlogEvent{
src: src,
msg: msg,
})
})
} else {
logger.Warn("Failed to verify proposal", "err", err, "duration", duration)
c.sendNextRoundChange()
}
return err
}
// Here is about to accept the PRE-PREPARE
if c.state == StateAcceptRequest {
c.acceptPreprepare(preprepare)
c.setState(StatePreprepared)
c.sendPrepare()
}
return nil
}
func (c *core) acceptPreprepare(preprepare *Preprepare) {
c.consensusTimestamp = time.Now()
c.current.SetPreprepare(preprepare)
}
// validatePrepreparedMessage validates Preprepared message received
func (c *core) validatePrepreparedMessage(preprepare *Preprepare, src istanbul.Validator) bool {
logger := c.logger.New("from", src, "state", c.state)
c.newRoundChangeTimer()
highestPreparedRound, validRC := c.checkRoundChangeMessages(preprepare, src)
if !validRC {
logger.Error("Unable to verify Round Change messages in Preprepare")
return false
}
if highestPreparedRound != 0 && !c.checkPreparedMessages(preprepare, highestPreparedRound, src) {
logger.Error("Unable to verify Prepared messages in Preprepare")
return false
}
return true
}
// checkRoundChangeMessages verifies if the Round Change message is signed by a valid validator and
// Also, check if the proposal was the preparedBlock corresponding to the highest preparedRound
func (c *core) checkRoundChangeMessages(preprepare *Preprepare, src istanbul.Validator) (uint64, bool) {
logger := c.logger.New("from", src, "state", c.state)
if preprepare.RCMessages != nil && preprepare.RCMessages.messages != nil {
if c.validateFn != nil {
if err := validateMsgSignature(preprepare.RCMessages.messages, c.validateFn); err != nil {
logger.Error("Unable to validate round change message signature", "err", err)
return 0, false
}
}
var preparedRound uint64 = 0
var preparedBlock istanbul.Proposal
for _, msg := range preprepare.RCMessages.messages {
var rc *RoundChangeMessage
if err := msg.Decode(&rc); err != nil {
logger.Error("Failed to decode ROUND CHANGE", "err", err)
return 0, false
}
if rc.PreparedRound.Uint64() > preparedRound {
preparedRound = rc.PreparedRound.Uint64()
preparedBlock = rc.PreparedBlock
}
}
if preparedRound == 0 {
return preparedRound, true
}
if preparedRound > 0 {
return preparedRound, preparedBlock == preprepare.Proposal
}
}
return 0, false
}
// checkPreparedMessages verifies if a Quorum of Prepared messages were received and
// the block in each prepared message is the same as the proposal and is prepared in the same round
func (c *core) checkPreparedMessages(preprepare *Preprepare, highestPreparedRound uint64, src istanbul.Validator) bool {
logger := c.logger.New("from", src, "state", c.state)
if preprepare.PreparedMessages != nil && preprepare.PreparedMessages.messages != nil {
if c.validateFn != nil {
if err := validateMsgSignature(preprepare.PreparedMessages.messages, c.validateFn); err != nil {
logger.Error("Unable to validate round change message signature", "err", err)
return false
}
}
// Number of prepared messages should not be less than Quorum of messages
if len(preprepare.PreparedMessages.messages) < c.QuorumSize() {
logger.Error("Quorum of Prepared messages not found in Preprepare messages")
return false
}
// Check if the block in each prepared message is the one that is being proposed
for addr, msg := range preprepare.PreparedMessages.messages {
var prepare *Subject
if err := msg.Decode(&prepare); err != nil {
logger.Error("Failed to decode Prepared Message", "err", err)
return false
}
if prepare.Digest.Hash() != preprepare.Proposal.Hash() {
logger.Error("Prepared block does not match the Proposal", "Address", addr)
return false
}
if prepare.View.Round.Uint64() != highestPreparedRound {
logger.Error("Round in Prepared Block does not match the Highest Prepared Round", "Address", addr)
return false
}
}
}
return false
}

View File

@ -0,0 +1,207 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"math/big"
"reflect"
"testing"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)
func newTestPreprepare(v *View) *Preprepare {
return &Preprepare{
View: v,
Proposal: newTestProposal(),
}
}
func TestHandlePreprepare(t *testing.T) {
N := uint64(4) // replica 0 is the proposer, it will send messages to others
F := uint64(1) // F does not affect tests
testCases := []struct {
system *testSystem
expectedRequest istanbul.Proposal
expectedErr error
existingBlock bool
}{
{
// normal case
func() *testSystem {
sys := NewTestSystemWithBackend(N, F)
for i, backend := range sys.backends {
c := backend.engine.(*core)
c.valSet = backend.peers
if i != 0 {
c.state = StateAcceptRequest
}
}
return sys
}(),
newTestProposal(),
nil,
false,
},
{
// future message
func() *testSystem {
sys := NewTestSystemWithBackend(N, F)
for i, backend := range sys.backends {
c := backend.engine.(*core)
c.valSet = backend.peers
if i != 0 {
c.state = StateAcceptRequest
// hack: force set subject that future message can be simulated
c.current = newTestRoundState(
&View{
Round: big.NewInt(0),
Sequence: big.NewInt(0),
},
c.valSet,
)
} else {
c.current.SetSequence(big.NewInt(10))
}
}
return sys
}(),
makeBlock(1),
errFutureMessage,
false,
},
{
// non-proposer
func() *testSystem {
sys := NewTestSystemWithBackend(N, F)
// force remove replica 0, let replica 1 be the proposer
sys.backends = sys.backends[1:]
for i, backend := range sys.backends {
c := backend.engine.(*core)
c.valSet = backend.peers
if i != 0 {
// replica 0 is the proposer
c.state = StatePreprepared
}
}
return sys
}(),
makeBlock(1),
errNotFromProposer,
false,
},
{
// errOldMessage
func() *testSystem {
sys := NewTestSystemWithBackend(N, F)
for i, backend := range sys.backends {
c := backend.engine.(*core)
c.valSet = backend.peers
if i != 0 {
c.state = StatePreprepared
c.current.SetSequence(big.NewInt(10))
c.current.SetRound(big.NewInt(10))
}
}
return sys
}(),
makeBlock(1),
errOldMessage,
false,
},
}
OUTER:
for _, test := range testCases {
test.system.Run(false)
v0 := test.system.backends[0]
r0 := v0.engine.(*core)
curView := r0.currentView()
preprepare := &Preprepare{
View: curView,
Proposal: test.expectedRequest,
RCMessages: newMessageSet(r0.valSet),
PreparedMessages: newMessageSet(r0.valSet),
}
for i, v := range test.system.backends {
// i == 0 is primary backend, it is responsible for send PRE-PREPARE messages to others.
if i == 0 {
continue
}
c := v.engine.(*core)
m, _ := Encode(preprepare)
_, val := r0.valSet.GetByAddress(v0.Address())
// run each backends and verify handlePreprepare function.
if err := c.handlePreprepare(&message{
Code: msgPreprepare,
Msg: m,
Address: v0.Address(),
}, val); err != nil {
if err != test.expectedErr {
t.Errorf("error mismatch: have %v, want %v", err, test.expectedErr)
}
continue OUTER
}
if c.state != StatePreprepared {
t.Errorf("state mismatch: have %v, want %v", c.state, StatePreprepared)
}
if !test.existingBlock && !reflect.DeepEqual(c.current.Subject().View, curView) {
t.Errorf("view mismatch: have %v, want %v", c.current.Subject().View, curView)
}
// verify prepare messages
decodedMsg := new(message)
err := decodedMsg.FromPayload(v.sentMsgs[0], nil)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
expectedCode := msgPrepare
if test.existingBlock {
expectedCode = msgCommit
}
if decodedMsg.Code != expectedCode {
t.Errorf("message code mismatch: have %v, want %v", decodedMsg.Code, expectedCode)
}
var subject *Subject
err = decodedMsg.Decode(&subject)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
if !test.existingBlock && !reflect.DeepEqual(subject, c.current.Subject()) {
t.Errorf("subject mismatch: have %v, want %v", subject, c.current.Subject())
}
}
}
}

View File

@ -0,0 +1,99 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"github.com/ethereum/go-ethereum/consensus/istanbul"
)
func (c *core) handleRequest(request *Request) error {
logger := c.logger.New("state", c.state, "seq", c.current.sequence)
if err := c.checkRequestMsg(request); err != nil {
if err == errInvalidMessage {
logger.Warn("invalid request")
return err
}
logger.Warn("unexpected request", "err", err, "number", request.Proposal.Number(), "hash", request.Proposal.Hash())
return err
}
logger.Trace("handleRequest", "number", request.Proposal.Number(), "hash", request.Proposal.Hash())
c.current.pendingRequest = request
if c.state == StateAcceptRequest {
c.sendPreprepare(request)
}
return nil
}
// check request state
// return errInvalidMessage if the message is invalid
// return errFutureMessage if the sequence of proposal is larger than current sequence
// return errOldMessage if the sequence of proposal is smaller than current sequence
func (c *core) checkRequestMsg(request *Request) error {
if request == nil || request.Proposal == nil {
return errInvalidMessage
}
if c := c.current.sequence.Cmp(request.Proposal.Number()); c > 0 {
return errOldMessage
} else if c < 0 {
return errFutureMessage
} else {
return nil
}
}
func (c *core) storeRequestMsg(request *Request) {
logger := c.logger.New("state", c.state)
logger.Trace("Store future request", "number", request.Proposal.Number(), "hash", request.Proposal.Hash())
c.pendingRequestsMu.Lock()
defer c.pendingRequestsMu.Unlock()
c.pendingRequests.Push(request, float32(-request.Proposal.Number().Int64()))
}
func (c *core) processPendingRequests() {
c.pendingRequestsMu.Lock()
defer c.pendingRequestsMu.Unlock()
for !(c.pendingRequests.Empty()) {
m, prio := c.pendingRequests.Pop()
r, ok := m.(*Request)
if !ok {
c.logger.Warn("Malformed request, skip", "msg", m)
continue
}
// Push back if it's a future message
err := c.checkRequestMsg(r)
if err != nil {
if err == errFutureMessage {
c.logger.Trace("Stop processing request", "number", r.Proposal.Number(), "hash", r.Proposal.Hash())
c.pendingRequests.Push(m, prio)
break
}
c.logger.Trace("Skip the pending request", "number", r.Proposal.Number(), "hash", r.Proposal.Hash(), "err", err)
continue
}
c.logger.Trace("Post pending request", "number", r.Proposal.Number(), "hash", r.Proposal.Hash())
go c.sendEvent(istanbul.RequestEvent{
Proposal: r.Proposal,
})
}
}

View File

@ -0,0 +1,137 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"math/big"
"reflect"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
func TestCheckRequestMsg(t *testing.T) {
c := &core{
state: StateAcceptRequest,
current: newRoundState(&View{
Sequence: big.NewInt(1),
Round: big.NewInt(0),
}, newTestValidatorSet(4), nil, nil, nil),
}
// invalid request
err := c.checkRequestMsg(nil)
if err != errInvalidMessage {
t.Errorf("error mismatch: have %v, want %v", err, errInvalidMessage)
}
r := &Request{
Proposal: nil,
}
err = c.checkRequestMsg(r)
if err != errInvalidMessage {
t.Errorf("error mismatch: have %v, want %v", err, errInvalidMessage)
}
// old request
r = &Request{
Proposal: makeBlock(0),
}
err = c.checkRequestMsg(r)
if err != errOldMessage {
t.Errorf("error mismatch: have %v, want %v", err, errOldMessage)
}
// future request
r = &Request{
Proposal: makeBlock(2),
}
err = c.checkRequestMsg(r)
if err != errFutureMessage {
t.Errorf("error mismatch: have %v, want %v", err, errFutureMessage)
}
// current request
r = &Request{
Proposal: makeBlock(1),
}
err = c.checkRequestMsg(r)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
}
func TestStoreRequestMsg(t *testing.T) {
backend := &testSystemBackend{
events: new(event.TypeMux),
}
c := &core{
logger: log.New("backend", "test", "id", 0),
backend: backend,
state: StateAcceptRequest,
current: newRoundState(&View{
Sequence: big.NewInt(0),
Round: big.NewInt(0),
}, newTestValidatorSet(4), nil, nil, nil),
pendingRequests: prque.New(),
pendingRequestsMu: new(sync.Mutex),
}
requests := []Request{
{
Proposal: makeBlock(1),
},
{
Proposal: makeBlock(2),
},
{
Proposal: makeBlock(3),
},
}
c.storeRequestMsg(&requests[1])
c.storeRequestMsg(&requests[0])
c.storeRequestMsg(&requests[2])
if c.pendingRequests.Size() != len(requests) {
t.Errorf("the size of pending requests mismatch: have %v, want %v", c.pendingRequests.Size(), len(requests))
}
c.current.sequence = big.NewInt(3)
c.subscribeEvents()
defer c.unsubscribeEvents()
c.processPendingRequests()
const timeoutDura = 2 * time.Second
timeout := time.NewTimer(timeoutDura)
select {
case ev := <-c.events.Chan():
e, ok := ev.Data.(istanbul.RequestEvent)
if !ok {
t.Errorf("unexpected event comes: %v", reflect.TypeOf(ev.Data))
}
if e.Proposal.Number().Cmp(requests[2].Proposal.Number()) != 0 {
t.Errorf("the number of proposal mismatch: have %v, want %v", e.Proposal.Number(), requests[2].Proposal.Number())
}
case <-timeout.C:
t.Error("unexpected timeout occurs")
}
}

View File

@ -0,0 +1,319 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"github.com/ethereum/go-ethereum/core/types"
"math/big"
"sort"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)
// sendNextRoundChange sends the ROUND CHANGE message with current round + 1
func (c *core) sendNextRoundChange() {
cv := c.currentView()
c.sendRoundChange(new(big.Int).Add(cv.Round, common.Big1))
}
// sendRoundChange sends the ROUND CHANGE message with the given round
func (c *core) sendRoundChange(round *big.Int) {
logger := c.logger.New("state", c.state)
cv := c.currentView()
if cv.Round.Cmp(round) > 0 {
logger.Error("Cannot send out the round change", "current round", cv.Round, "target round", round)
return
}
// Now we have the new round number and sequence number
prepares := c.PreparedRoundPrepares
if prepares == nil {
prepares = newMessageSet(c.valSet)
}
// If a block has not been prepared and a round change message occurs the preparedBlock is nil, setting it to NilBlock, so that decoding works fine
preparedBlock := c.current.preparedBlock
if preparedBlock == nil {
preparedBlock = NilBlock()
}
cv = c.currentView()
rc := &RoundChangeMessage{
View: cv,
PreparedRound: c.current.preparedRound,
PreparedBlock: preparedBlock,
PreparedMessages: prepares,
}
payload, err := Encode(rc)
if err != nil {
logger.Error("Failed to encode ROUND CHANGE", "rc", rc, "err", err)
return
}
c.broadcast(&message{
Code: msgRoundChange,
Msg: payload,
})
c.current.rcMsgSentInRound = round
}
func (c *core) handleRoundChange(msg *message, src istanbul.Validator) error {
logger := c.logger.New("state", c.state, "from", src.Address().Hex())
// Decode ROUND CHANGE message
var rc *RoundChangeMessage
if err := msg.Decode(&rc); err != nil {
logger.Error("Failed to decode ROUND CHANGE", "err", err)
return errInvalidMessage
}
if err := c.checkMessage(msgRoundChange, rc.View); err != nil {
return err
}
if rc.PreparedMessages != nil && rc.PreparedMessages.messages != nil {
if c.validateFn != nil {
if err := validateMsgSignature(rc.PreparedMessages.messages, c.validateFn); err != nil {
logger.Error("Unable to validate round change message signature", "err", err)
return err
}
}
}
cv := c.currentView()
roundView := rc.View
// Add the ROUND CHANGE message to its message set and return how many
// messages we've got with the same round number and sequence number.
if roundView.Round.Cmp(cv.Round) >= 0 {
pb := rc.PreparedBlock
pr := rc.PreparedRound
// Checking if NilBlock was sent as prepared block
if NilBlock().Hash() == pb.Hash() {
pb = nil
pr = nil
}
err := c.roundChangeSet.Add(roundView.Round, msg, pr, pb)
if err != nil {
logger.Warn("Failed to add round change message", "from", src, "msg", msg, "err", err)
return err
}
}
num := c.roundChangeSet.higherRoundMessages(cv.Round)
currentRoundMessages := c.roundChangeSet.getRCMessagesForGivenRound(cv.Round)
if num == int(c.valSet.F()+1) {
newRound := c.roundChangeSet.getMinRoundChange(cv.Round)
logger.Trace("Starting new Round", "round", newRound)
c.startNewRound(newRound)
c.newRoundChangeTimer()
c.sendRoundChange(newRound)
} else if currentRoundMessages == c.QuorumSize() && c.IsProposer() && c.justifyRoundChange(cv.Round) {
_, proposal := c.highestPrepared(cv.Round)
if proposal == nil {
proposal = c.current.pendingRequest.Proposal
}
r := &Request{
Proposal: proposal,
RCMessages: c.roundChangeSet.roundChanges[cv.Round.Uint64()],
PrepareMessages: c.PreparedRoundPrepares,
}
c.sendPreprepare(r)
}
return nil
}
// justifyRoundChange validates if the round change is valid or not
func (c *core) justifyRoundChange(round *big.Int) bool {
if pr := c.roundChangeSet.preparedRounds[round.Uint64()]; pr == nil {
return true
}
pr, pv := c.highestPrepared(round)
// Check if the block in each prepared message is the one that is being proposed
// To handle the case where a byzantine node can send an empty prepared block, check atleast Quorum of prepared blocks match the condition and not all
i := 0
for addr, msg := range c.PreparedRoundPrepares.messages {
var prepare *Subject
if err := msg.Decode(&prepare); err != nil {
c.logger.Error("Failed to decode Prepared Message", "err", err)
return false
}
if prepare.Digest.Hash() != pv.Hash() {
c.logger.Error("Highest Prepared Block does not match the Proposal", "Address", addr)
return false
}
if prepare.View.Round.Uint64() != pr.Uint64() {
c.logger.Error("Round in Prepared Block does not match the Highest Prepared Round", "Address", addr)
return false
}
i++
if i == c.QuorumSize() {
// validated Quorum of prepared messages
return true
}
}
return false
}
// highestPrepared returns the highest Prepared Round and the corresponding Prepared Block
func (c *core) highestPrepared(round *big.Int) (*big.Int, istanbul.Proposal) {
return c.roundChangeSet.preparedRounds[round.Uint64()], c.roundChangeSet.preparedBlocks[round.Uint64()]
}
// ----------------------------------------------------------------------------
func newRoundChangeSet(valSet istanbul.ValidatorSet) *roundChangeSet {
return &roundChangeSet{
validatorSet: valSet,
roundChanges: make(map[uint64]*messageSet),
mu: new(sync.Mutex),
}
}
type roundChangeSet struct {
validatorSet istanbul.ValidatorSet
roundChanges map[uint64]*messageSet
preparedRounds map[uint64]*big.Int
preparedBlocks map[uint64]istanbul.Proposal
mu *sync.Mutex
}
// Add adds the round and message into round change set
func (rcs *roundChangeSet) Add(r *big.Int, msg *message, preparedRound *big.Int, preparedBlock istanbul.Proposal) error {
rcs.mu.Lock()
defer rcs.mu.Unlock()
round := r.Uint64()
if rcs.roundChanges[round] == nil {
rcs.roundChanges[round] = newMessageSet(rcs.validatorSet)
}
err := rcs.roundChanges[round].Add(msg)
if err != nil {
return err
}
if rcs.preparedRounds == nil {
rcs.preparedRounds = make(map[uint64]*big.Int)
}
if rcs.preparedBlocks == nil {
rcs.preparedBlocks = make(map[uint64]istanbul.Proposal)
}
if rcs.preparedRounds[round] == nil {
rcs.preparedRounds[round] = preparedRound
rcs.preparedBlocks[round] = preparedBlock
} else if preparedRound.Cmp(rcs.preparedRounds[round]) > 0 {
rcs.preparedRounds[round] = preparedRound
rcs.preparedBlocks[round] = preparedBlock
}
return nil
}
// higherRoundMessages returns the number of Round Change messages received for the round greater than the given round
func (rcs *roundChangeSet) higherRoundMessages(round *big.Int) int {
rcs.mu.Lock()
defer rcs.mu.Unlock()
num := 0
for k, rms := range rcs.roundChanges {
if k > round.Uint64() {
num = num + len(rms.messages)
}
}
return num
}
func (rcs *roundChangeSet) getRCMessagesForGivenRound(round *big.Int) int {
rcs.mu.Lock()
defer rcs.mu.Unlock()
if rms := rcs.roundChanges[round.Uint64()]; rms != nil {
return len(rms.messages)
}
return 0
}
// getMinRoundChange returns the minimum round greater than the given round
func (rcs *roundChangeSet) getMinRoundChange(round *big.Int) *big.Int {
rcs.mu.Lock()
defer rcs.mu.Unlock()
var keys []int
for k := range rcs.roundChanges {
if k > round.Uint64() {
keys = append(keys, int(k))
}
}
sort.Ints(keys)
if len(keys) == 0 {
return round
}
return big.NewInt(int64(keys[0]))
}
// ClearLowerThan deletes the messages for round earlier than the given round
func (rcs *roundChangeSet) ClearLowerThan(round *big.Int) {
rcs.mu.Lock()
defer rcs.mu.Unlock()
for k, rms := range rcs.roundChanges {
if len(rms.Values()) == 0 || k < round.Uint64() {
delete(rcs.roundChanges, k)
delete(rcs.preparedRounds, k)
delete(rcs.preparedBlocks, k)
}
}
}
// MaxRound returns the max round which the number of messages is equal or larger than num
func (rcs *roundChangeSet) MaxRound(num int) *big.Int {
rcs.mu.Lock()
defer rcs.mu.Unlock()
var maxRound *big.Int
for k, rms := range rcs.roundChanges {
if rms.Size() < num {
continue
}
r := big.NewInt(int64(k))
if maxRound == nil || maxRound.Cmp(r) < 0 {
maxRound = r
}
}
return maxRound
}
// NilBlock represents a nil block and is sent in RoundChangeMessage if PreparedBlock is nil
func NilBlock() *types.Block {
header := &types.Header{
Difficulty: big.NewInt(0),
Number: big.NewInt(0),
GasLimit: 0,
GasUsed: 0,
Time: 0,
}
block := &types.Block{}
return block.WithSeal(header)
}

View File

@ -0,0 +1,92 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"math/big"
"testing"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
)
func TestRoundChangeSet(t *testing.T) {
vset := validator.NewSet(generateValidators(4), istanbul.RoundRobin)
rc := newRoundChangeSet(vset)
view := &View{
Sequence: big.NewInt(1),
Round: big.NewInt(1),
}
r := &RoundChangeMessage{
View: view,
PreparedRound: big.NewInt(1),
PreparedBlock: newTestProposal(),
}
m, _ := Encode(r)
// Test Add()
// Add message from all validators
for i, v := range vset.List() {
msg := &message{
Code: msgRoundChange,
Msg: m,
Address: v.Address(),
}
rc.Add(view.Round, msg, r.PreparedRound, r.PreparedBlock)
if rc.roundChanges[view.Round.Uint64()].Size() != i+1 {
t.Errorf("the size of round change messages mismatch: have %v, want %v", rc.roundChanges[view.Round.Uint64()].Size(), i+1)
}
}
// Add message again from all validators, but the size should be the same
for _, v := range vset.List() {
msg := &message{
Code: msgRoundChange,
Msg: m,
Address: v.Address(),
}
rc.Add(view.Round, msg, r.PreparedRound, r.PreparedBlock)
if rc.roundChanges[view.Round.Uint64()].Size() != vset.Size() {
t.Errorf("the size of round change messages mismatch: have %v, want %v", rc.roundChanges[view.Round.Uint64()].Size(), vset.Size())
}
}
// Test MaxRound()
for i := 0; i < 10; i++ {
maxRound := rc.MaxRound(i)
if i <= vset.Size() {
if maxRound == nil || maxRound.Cmp(view.Round) != 0 {
t.Errorf("max round mismatch: have %v, want %v", maxRound, view.Round)
}
} else if maxRound != nil {
t.Errorf("max round mismatch: have %v, want nil", maxRound)
}
}
// Test ClearLowerThan()
for i := int64(0); i < 2; i++ {
rc.ClearLowerThan(big.NewInt(i))
if rc.roundChanges[view.Round.Uint64()].Size() != vset.Size() {
t.Errorf("the size of round change messages mismatch: have %v, want %v", rc.roundChanges[view.Round.Uint64()].Size(), vset.Size())
}
}
rc.ClearLowerThan(big.NewInt(2))
if rc.roundChanges[view.Round.Uint64()] != nil {
t.Errorf("the change messages mismatch: have %v, want nil", rc.roundChanges[view.Round.Uint64()])
}
}

View File

@ -0,0 +1,218 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"io"
"math/big"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/rlp"
)
// newRoundState creates a new roundState instance with the given view and validatorSet
func newRoundState(view *View, validatorSet istanbul.ValidatorSet, preprepare *Preprepare, pendingRequest *Request, hasBadProposal func(hash common.Hash) bool) *roundState {
return &roundState{
round: view.Round,
sequence: view.Sequence,
Preprepare: preprepare,
Prepares: newMessageSet(validatorSet),
Commits: newMessageSet(validatorSet),
mu: new(sync.RWMutex),
pendingRequest: pendingRequest,
hasBadProposal: hasBadProposal,
rcMsgSentInRound: big.NewInt(0),
}
}
// roundState stores the consensus state
type roundState struct {
round *big.Int
sequence *big.Int
Preprepare *Preprepare
Prepares *messageSet
Commits *messageSet
pendingRequest *Request
preparedRound *big.Int
preparedBlock istanbul.Proposal
rcMsgSentInRound *big.Int
mu *sync.RWMutex
hasBadProposal func(hash common.Hash) bool
}
func (s *roundState) GetPrepareOrCommitSize() int {
s.mu.RLock()
defer s.mu.RUnlock()
result := s.Prepares.Size() + s.Commits.Size()
// find duplicate one
for _, m := range s.Prepares.Values() {
if s.Commits.Get(m.Address) != nil {
result--
}
}
return result
}
func (s *roundState) Subject() *Subject {
s.mu.RLock()
defer s.mu.RUnlock()
if s.Preprepare == nil {
return nil
}
return &Subject{
View: &View{
Round: new(big.Int).Set(s.round),
Sequence: new(big.Int).Set(s.sequence),
},
Digest: s.Preprepare.Proposal,
}
}
func (s *roundState) SetPreprepare(preprepare *Preprepare) {
s.mu.Lock()
defer s.mu.Unlock()
s.Preprepare = preprepare
}
func (s *roundState) Proposal() istanbul.Proposal {
s.mu.RLock()
defer s.mu.RUnlock()
if s.Preprepare != nil {
return s.Preprepare.Proposal
}
return nil
}
func (s *roundState) SetRound(r *big.Int) {
s.mu.Lock()
defer s.mu.Unlock()
s.round = new(big.Int).Set(r)
}
func (s *roundState) Round() *big.Int {
s.mu.RLock()
defer s.mu.RUnlock()
return s.round
}
func (s *roundState) SetSequence(seq *big.Int) {
s.mu.Lock()
defer s.mu.Unlock()
s.sequence = seq
}
func (s *roundState) Sequence() *big.Int {
s.mu.RLock()
defer s.mu.RUnlock()
return s.sequence
}
/*func (s *roundState) LockHash() {
s.mu.Lock()
defer s.mu.Unlock()
if s.Preprepare != nil {
s.lockedHash = s.Preprepare.Proposal.Hash()
}
}*/
/*func (s *roundState) UnlockHash() {
s.mu.Lock()
defer s.mu.Unlock()
s.lockedHash = common.Hash{}
}*/
/*func (s *roundState) IsHashLocked() bool {
s.mu.RLock()
defer s.mu.RUnlock()
if common.EmptyHash(s.lockedHash) {
return false
}
return !s.hasBadProposal(s.GetLockedHash())
}*/
/*func (s *roundState) GetLockedHash() common.Hash {
s.mu.RLock()
defer s.mu.RUnlock()
return s.lockedHash
}*/
// The DecodeRLP method should read one value from the given
// Stream. It is not forbidden to read less or more, but it might
// be confusing.
func (s *roundState) DecodeRLP(stream *rlp.Stream) error {
var ss struct {
Round *big.Int
Sequence *big.Int
Preprepare *Preprepare
Prepares *messageSet
Commits *messageSet
pendingRequest *Request
}
if err := stream.Decode(&ss); err != nil {
return err
}
s.round = ss.Round
s.sequence = ss.Sequence
s.Preprepare = ss.Preprepare
s.Prepares = ss.Prepares
s.Commits = ss.Commits
s.pendingRequest = ss.pendingRequest
s.mu = new(sync.RWMutex)
return nil
}
// EncodeRLP should write the RLP encoding of its receiver to w.
// If the implementation is a pointer method, it may also be
// called for nil pointers.
//
// Implementations should generate valid RLP. The data written is
// not verified at the moment, but a future version might. It is
// recommended to write only a single value but writing multiple
// values or no value at all is also permitted.
func (s *roundState) EncodeRLP(w io.Writer) error {
s.mu.RLock()
defer s.mu.RUnlock()
return rlp.Encode(w, []interface{}{
s.round,
s.sequence,
s.Preprepare,
s.Prepares,
s.Commits,
s.pendingRequest,
})
}

View File

@ -0,0 +1,39 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)
func newTestRoundState(view *View, validatorSet istanbul.ValidatorSet) *roundState {
return &roundState{
round: view.Round,
sequence: view.Sequence,
Preprepare: newTestPreprepare(view),
Prepares: newMessageSet(validatorSet),
Commits: newMessageSet(validatorSet),
mu: new(sync.RWMutex),
hasBadProposal: func(hash common.Hash) bool {
return false
},
preparedRound: common.Big0,
}
}

View File

@ -0,0 +1,299 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"crypto/ecdsa"
"github.com/ethereum/go-ethereum/core/rawdb"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
elog "github.com/ethereum/go-ethereum/log"
)
var testLogger = elog.New()
type testSystemBackend struct {
id uint64
sys *testSystem
engine Engine
peers istanbul.ValidatorSet
events *event.TypeMux
committedMsgs []testCommittedMsgs
sentMsgs [][]byte // store the message when Send is called by core
address common.Address
db ethdb.Database
}
type testCommittedMsgs struct {
commitProposal istanbul.Proposal
committedSeals [][]byte
}
// ==============================================
//
// define the functions that needs to be provided for Istanbul.
func (self *testSystemBackend) Address() common.Address {
return self.address
}
// Peers returns all connected peers
func (self *testSystemBackend) Validators(proposal istanbul.Proposal) istanbul.ValidatorSet {
return self.peers
}
func (self *testSystemBackend) EventMux() *event.TypeMux {
return self.events
}
func (self *testSystemBackend) Send(message []byte, target common.Address) error {
testLogger.Info("enqueuing a message...", "address", self.Address())
self.sentMsgs = append(self.sentMsgs, message)
self.sys.queuedMessage <- istanbul.MessageEvent{
Payload: message,
}
return nil
}
func (self *testSystemBackend) Broadcast(valSet istanbul.ValidatorSet, message []byte) error {
testLogger.Info("enqueuing a message...", "address", self.Address())
self.sentMsgs = append(self.sentMsgs, message)
self.sys.queuedMessage <- istanbul.MessageEvent{
Payload: message,
}
return nil
}
func (self *testSystemBackend) Gossip(valSet istanbul.ValidatorSet, message []byte) error {
testLogger.Warn("not sign any data")
return nil
}
func (self *testSystemBackend) Commit(proposal istanbul.Proposal, seals [][]byte) error {
testLogger.Info("commit message", "address", self.Address())
self.committedMsgs = append(self.committedMsgs, testCommittedMsgs{
commitProposal: proposal,
committedSeals: seals,
})
// fake new head events
go self.events.Post(istanbul.FinalCommittedEvent{})
return nil
}
func (self *testSystemBackend) Verify(proposal istanbul.Proposal) (time.Duration, error) {
return 0, nil
}
func (self *testSystemBackend) Sign(data []byte) ([]byte, error) {
testLogger.Info("returning current backend address so that CheckValidatorSignature returns the same value")
return self.address.Bytes(), nil
}
func (self *testSystemBackend) CheckSignature([]byte, common.Address, []byte) error {
return nil
}
func (self *testSystemBackend) CheckValidatorSignature(data []byte, sig []byte) (common.Address, error) {
return common.BytesToAddress(sig), nil
}
func (self *testSystemBackend) Hash(b interface{}) common.Hash {
return common.StringToHash("Test")
}
func (self *testSystemBackend) NewRequest(request istanbul.Proposal) {
go self.events.Post(istanbul.RequestEvent{
Proposal: request,
})
}
func (self *testSystemBackend) HasBadProposal(hash common.Hash) bool {
return false
}
func (self *testSystemBackend) LastProposal() (istanbul.Proposal, common.Address) {
l := len(self.committedMsgs)
if l > 0 {
return self.committedMsgs[l-1].commitProposal, common.Address{}
}
return makeBlock(0), common.Address{}
}
// Only block height 5 will return true
func (self *testSystemBackend) HasPropsal(hash common.Hash, number *big.Int) bool {
return number.Cmp(big.NewInt(5)) == 0
}
func (self *testSystemBackend) GetProposer(number uint64) common.Address {
return common.Address{}
}
func (self *testSystemBackend) ParentValidators(proposal istanbul.Proposal) istanbul.ValidatorSet {
return self.peers
}
func (sb *testSystemBackend) Close() error {
return nil
}
func (sb *testSystemBackend) IsQIBFTConsensus() bool {
return true
}
func (sb *testSystemBackend) StartQIBFTConsensus() error {
return nil
}
// ==============================================
//
// define the struct that need to be provided for integration tests.
type testSystem struct {
backends []*testSystemBackend
queuedMessage chan istanbul.MessageEvent
quit chan struct{}
}
func newTestSystem(n uint64) *testSystem {
testLogger.SetHandler(elog.StdoutHandler)
return &testSystem{
backends: make([]*testSystemBackend, n),
queuedMessage: make(chan istanbul.MessageEvent),
quit: make(chan struct{}),
}
}
func generateValidators(n int) []common.Address {
vals := make([]common.Address, 0)
for i := 0; i < n; i++ {
privateKey, _ := crypto.GenerateKey()
vals = append(vals, crypto.PubkeyToAddress(privateKey.PublicKey))
}
return vals
}
func newTestValidatorSet(n int) istanbul.ValidatorSet {
return validator.NewSet(generateValidators(n), istanbul.RoundRobin)
}
// FIXME: int64 is needed for N and F
func NewTestSystemWithBackend(n, f uint64) *testSystem {
testLogger.SetHandler(elog.StdoutHandler)
addrs := generateValidators(int(n))
sys := newTestSystem(n)
config := istanbul.DefaultConfig
for i := uint64(0); i < n; i++ {
vset := validator.NewSet(addrs, istanbul.RoundRobin)
backend := sys.NewBackend(i)
backend.peers = vset
backend.address = vset.GetByIndex(i).Address()
core := New(backend, config).(*core)
core.state = StateAcceptRequest
core.current = newRoundState(&View{
Round: big.NewInt(0),
Sequence: big.NewInt(1),
}, vset, nil, nil, func(hash common.Hash) bool {
return false
})
core.valSet = vset
core.logger = testLogger
core.validateFn = backend.CheckValidatorSignature
backend.engine = core
}
return sys
}
// listen will consume messages from queue and deliver a message to core
func (t *testSystem) listen() {
for {
select {
case <-t.quit:
return
case queuedMessage := <-t.queuedMessage:
testLogger.Info("consuming a queue message...")
for _, backend := range t.backends {
go backend.EventMux().Post(queuedMessage)
}
}
}
}
// Run will start system components based on given flag, and returns a closer
// function that caller can control lifecycle
//
// Given a true for core if you want to initialize core engine.
func (t *testSystem) Run(core bool) func() {
for _, b := range t.backends {
if core {
b.engine.Start() // start Istanbul core
}
}
go t.listen()
closer := func() { t.stop(core) }
return closer
}
func (t *testSystem) stop(core bool) {
close(t.quit)
for _, b := range t.backends {
if core {
b.engine.Stop()
}
}
}
func (t *testSystem) NewBackend(id uint64) *testSystemBackend {
// assume always success
ethDB := rawdb.NewMemoryDatabase()
backend := &testSystemBackend{
id: id,
sys: t,
events: new(event.TypeMux),
db: ethDB,
}
t.backends[id] = backend
return backend
}
// ==============================================
//
// helper functions.
func getPublicKeyAddress(privateKey *ecdsa.PrivateKey) common.Address {
return crypto.PubkeyToAddress(privateKey.PublicKey)
}

View File

@ -0,0 +1,320 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"bytes"
"fmt"
"io"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
)
type Engine interface {
Start() error
Stop() error
IsProposer() bool
// verify if a hash is the same as the proposed block in the current pending request
//
// this is useful when the engine is currently the proposer
//
// pending request is populated right at the preprepare stage so this would give us the earliest verification
// to avoid any race condition of coming propagated blocks
IsCurrentProposal(blockHash common.Hash) bool
}
type State uint64
const (
StateAcceptRequest State = iota
StatePreprepared
StatePrepared
StateCommitted
)
func (s State) String() string {
if s == StateAcceptRequest {
return "Accept request"
} else if s == StatePreprepared {
return "Preprepared"
} else if s == StatePrepared {
return "Prepared"
} else if s == StateCommitted {
return "Committed"
} else {
return "Unknown"
}
}
// Cmp compares s and y and returns:
// -1 if s is the previous state of y
// 0 if s and y are the same state
// +1 if s is the next state of y
func (s State) Cmp(y State) int {
if uint64(s) < uint64(y) {
return -1
}
if uint64(s) > uint64(y) {
return 1
}
return 0
}
const (
msgPreprepare uint64 = iota
msgPrepare
msgCommit
msgRoundChange
msgAll
)
type message struct {
Code uint64
Msg []byte
Address common.Address
Signature []byte
CommittedSeal []byte
}
// ==============================================
//
// define the functions that needs to be provided for rlp Encoder/Decoder.
// EncodeRLP serializes m into the Ethereum RLP format.
func (m *message) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{m.Code, m.Msg, m.Address, m.Signature, m.CommittedSeal})
}
// DecodeRLP implements rlp.Decoder, and load the consensus fields from a RLP stream.
func (m *message) DecodeRLP(s *rlp.Stream) error {
var msg struct {
Code uint64
Msg []byte
Address common.Address
Signature []byte
CommittedSeal []byte
}
if err := s.Decode(&msg); err != nil {
return err
}
m.Code, m.Msg, m.Address, m.Signature, m.CommittedSeal = msg.Code, msg.Msg, msg.Address, msg.Signature, msg.CommittedSeal
return nil
}
// ==============================================
//
// define the functions that needs to be provided for core.
func (m *message) FromPayload(b []byte, validateFn func([]byte, []byte) (common.Address, error)) error {
// Decode message
err := rlp.DecodeBytes(b, &m)
if err != nil {
return err
}
// Validate message (on a message without Signature)
if validateFn != nil {
var payload []byte
payload, err = m.PayloadNoSig()
if err != nil {
return err
}
signerAdd, err := validateFn(payload, m.Signature)
if err != nil {
return err
}
if bytes.Compare(signerAdd.Bytes(), m.Address.Bytes()) != 0 {
return errInvalidSigner
}
}
return nil
}
func (m *message) Payload() ([]byte, error) {
return rlp.EncodeToBytes(m)
}
func (m *message) PayloadNoSig() ([]byte, error) {
return rlp.EncodeToBytes(&message{
Code: m.Code,
Msg: m.Msg,
Address: m.Address,
Signature: []byte{},
CommittedSeal: m.CommittedSeal,
})
}
func (m *message) Decode(val interface{}) error {
return rlp.DecodeBytes(m.Msg, val)
}
func (m *message) String() string {
return fmt.Sprintf("{Code: %v, Address: %v}", m.Code, m.Address.String())
}
// ==============================================
//
// helper functions
func Encode(val interface{}) ([]byte, error) {
return rlp.EncodeToBytes(val)
}
type Request struct {
Proposal istanbul.Proposal
RCMessages *messageSet
PrepareMessages *messageSet
}
// View includes a round number and a sequence number.
// Sequence is the block number we'd like to commit.
// Each round has a number and is composed by 3 steps: preprepare, prepare and commit.
//
// If the given block is not accepted by validators, a round change will occur
// and the validators start a new round with round+1.
type View struct {
Round *big.Int
Sequence *big.Int
}
// EncodeRLP serializes b into the Ethereum RLP format.
func (v *View) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{v.Round, v.Sequence})
}
// DecodeRLP implements rlp.Decoder, and load the consensus fields from a RLP stream.
func (v *View) DecodeRLP(s *rlp.Stream) error {
var view struct {
Round *big.Int
Sequence *big.Int
}
if err := s.Decode(&view); err != nil {
return err
}
v.Round, v.Sequence = view.Round, view.Sequence
return nil
}
func (v *View) String() string {
return fmt.Sprintf("{Round: %d, Sequence: %d}", v.Round.Uint64(), v.Sequence.Uint64())
}
// Cmp compares v and y and returns:
// -1 if v < y
// 0 if v == y
// +1 if v > y
func (v *View) Cmp(y *View) int {
if v.Sequence.Cmp(y.Sequence) != 0 {
return v.Sequence.Cmp(y.Sequence)
}
if v.Round.Cmp(y.Round) != 0 {
return v.Round.Cmp(y.Round)
}
return 0
}
type Preprepare struct {
View *View
Proposal istanbul.Proposal
RCMessages *messageSet
PreparedMessages *messageSet
}
// EncodeRLP serializes b into the Ethereum RLP format.
func (b *Preprepare) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{b.View, b.Proposal, b.RCMessages, b.PreparedMessages})
}
// DecodeRLP implements rlp.Decoder, and load the consensus fields from a RLP stream.
func (b *Preprepare) DecodeRLP(s *rlp.Stream) error {
var preprepare struct {
View *View
Proposal *types.Block
RCMessages *messageSet
PreparedMessages *messageSet
}
if err := s.Decode(&preprepare); err != nil {
return err
}
b.View, b.Proposal, b.RCMessages, b.PreparedMessages = preprepare.View, preprepare.Proposal, preprepare.RCMessages, preprepare.PreparedMessages
return nil
}
type Subject struct {
View *View
Digest istanbul.Proposal
}
// EncodeRLP serializes b into the Ethereum RLP format.
func (b *Subject) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{b.View, b.Digest})
}
// DecodeRLP implements rlp.Decoder, and load the consensus fields from a RLP stream.
func (b *Subject) DecodeRLP(s *rlp.Stream) error {
var subject struct {
View *View
Digest *types.Block
}
if err := s.Decode(&subject); err != nil {
return err
}
b.View, b.Digest = subject.View, subject.Digest
return nil
}
func (b *Subject) String() string {
return fmt.Sprintf("{View: %v, Proposal: %v}", b.View, b.Digest.String())
}
type RoundChangeMessage struct {
View *View
PreparedRound *big.Int
PreparedBlock istanbul.Proposal
PreparedMessages *messageSet
}
func (r *RoundChangeMessage) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{r.View, r.PreparedRound, r.PreparedBlock, r.PreparedMessages})
}
func (r *RoundChangeMessage) DecodeRLP(s *rlp.Stream) error {
var rcMessage struct {
View *View
PreparedRound *big.Int
PreparedBlock *types.Block
PreparedMessages *messageSet
}
if err := s.Decode(&rcMessage); err != nil {
return err
}
r.View, r.PreparedRound, r.PreparedBlock, r.PreparedMessages = rcMessage.View, rcMessage.PreparedRound, rcMessage.PreparedBlock, rcMessage.PreparedMessages
return nil
}

View File

@ -0,0 +1,300 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"math/big"
"reflect"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/rlp"
)
func testPreprepare(t *testing.T) {
valSet := newTestValidatorSet(4)
pp := &Preprepare{
View: &View{
Round: big.NewInt(1),
Sequence: big.NewInt(2),
},
Proposal: makeBlock(1),
RCMessages: newMessageSet(valSet),
}
prepreparePayload, _ := Encode(pp)
m := &message{
Code: msgPreprepare,
Msg: prepreparePayload,
Address: common.HexToAddress("0x1234567890"),
}
msgPayload, err := m.Payload()
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
decodedMsg := new(message)
err = decodedMsg.FromPayload(msgPayload, nil)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
var decodedPP *Preprepare
err = decodedMsg.Decode(&decodedPP)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
// if block is encoded/decoded by rlp, we cannot to compare interface data type using reflect.DeepEqual. (like istanbul.Proposal)
// so individual comparison here.
if !reflect.DeepEqual(pp.Proposal.Hash(), decodedPP.Proposal.Hash()) {
t.Errorf("proposal hash mismatch: have %v, want %v", decodedPP.Proposal.Hash(), pp.Proposal.Hash())
}
if !reflect.DeepEqual(pp.View, decodedPP.View) {
t.Errorf("view mismatch: have %v, want %v", decodedPP.View, pp.View)
}
if !reflect.DeepEqual(pp.Proposal.Number(), decodedPP.Proposal.Number()) {
t.Errorf("proposal number mismatch: have %v, want %v", decodedPP.Proposal.Number(), pp.Proposal.Number())
}
}
func testSubject(t *testing.T) {
s := &Subject{
View: &View{
Round: big.NewInt(1),
Sequence: big.NewInt(2),
},
Digest: makeBlock(5),
}
subjectPayload, _ := Encode(s)
m := &message{
Code: msgPreprepare,
Msg: subjectPayload,
Address: common.HexToAddress("0x1234567890"),
}
msgPayload, err := m.Payload()
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
decodedMsg := new(message)
err = decodedMsg.FromPayload(msgPayload, nil)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
var decodedSub *Subject
err = decodedMsg.Decode(&decodedSub)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
if !reflect.DeepEqual(s.View, decodedSub.View) || s.Digest.Hash().Hex() != decodedSub.Digest.Hash().Hex() {
t.Errorf("subject mismatch: have %v, want %v", decodedSub, s)
}
}
func testSubjectWithSignature(t *testing.T) {
s := &Subject{
View: &View{
Round: big.NewInt(1),
Sequence: big.NewInt(2),
},
Digest: makeBlock(5),
}
expectedSig := []byte{0x01}
subjectPayload, _ := Encode(s)
// 1. Encode test
address := common.HexToAddress("0x1234567890")
m := &message{
Code: msgPreprepare,
Msg: subjectPayload,
Address: address,
Signature: expectedSig,
CommittedSeal: []byte{},
}
msgPayload, err := m.Payload()
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
// 2. Decode test
// 2.1 Test normal validate func
decodedMsg := new(message)
err = decodedMsg.FromPayload(msgPayload, func(data []byte, sig []byte) (common.Address, error) {
return address, nil
})
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
if !reflect.DeepEqual(decodedMsg, m) {
t.Errorf("error mismatch: have %v, want nil", err)
}
// 2.2 Test nil validate func
decodedMsg = new(message)
err = decodedMsg.FromPayload(msgPayload, nil)
if err != nil {
t.Error(err)
}
if !reflect.DeepEqual(decodedMsg, m) {
t.Errorf("message mismatch: have %v, want %v", decodedMsg, m)
}
// 2.3 Test failed validate func
decodedMsg = new(message)
err = decodedMsg.FromPayload(msgPayload, func(data []byte, sig []byte) (common.Address, error) {
return common.Address{}, istanbul.ErrUnauthorizedAddress
})
if err != istanbul.ErrUnauthorizedAddress {
t.Errorf("error mismatch: have %v, want %v", err, istanbul.ErrUnauthorizedAddress)
}
}
func TestMessageEncodeDecode(t *testing.T) {
testPreprepare(t)
testSubject(t)
testSubjectWithSignature(t)
}
func TestViewCompare(t *testing.T) {
// test equality
srvView := &View{
Sequence: big.NewInt(2),
Round: big.NewInt(1),
}
tarView := &View{
Sequence: big.NewInt(2),
Round: big.NewInt(1),
}
if r := srvView.Cmp(tarView); r != 0 {
t.Errorf("source(%v) should be equal to target(%v): have %v, want %v", srvView, tarView, r, 0)
}
// test larger Sequence
tarView = &View{
Sequence: big.NewInt(1),
Round: big.NewInt(1),
}
if r := srvView.Cmp(tarView); r != 1 {
t.Errorf("source(%v) should be larger than target(%v): have %v, want %v", srvView, tarView, r, 1)
}
// test larger Round
tarView = &View{
Sequence: big.NewInt(2),
Round: big.NewInt(0),
}
if r := srvView.Cmp(tarView); r != 1 {
t.Errorf("source(%v) should be larger than target(%v): have %v, want %v", srvView, tarView, r, 1)
}
// test smaller Sequence
tarView = &View{
Sequence: big.NewInt(3),
Round: big.NewInt(1),
}
if r := srvView.Cmp(tarView); r != -1 {
t.Errorf("source(%v) should be smaller than target(%v): have %v, want %v", srvView, tarView, r, -1)
}
tarView = &View{
Sequence: big.NewInt(2),
Round: big.NewInt(2),
}
if r := srvView.Cmp(tarView); r != -1 {
t.Errorf("source(%v) should be smaller than target(%v): have %v, want %v", srvView, tarView, r, -1)
}
}
func TestPreprepareEncodeDecode(t *testing.T) {
valSet := newTestValidatorSet(4)
view := &View{
Round: big.NewInt(1),
Sequence: big.NewInt(5),
}
proposal := makeBlock(5)
preprepare := &Preprepare{
View: view,
Proposal: proposal,
RCMessages: newMessageSet(valSet),
}
rawPreprepare, err := rlp.EncodeToBytes(preprepare)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
// decode preprepare message
msg := &message{
Code: msgPreprepare,
Msg: rawPreprepare,
Address: common.Address{},
}
var decPreprepare *Preprepare
err = msg.Decode(&decPreprepare)
if err != nil {
t.Errorf("error decoding preprepare message: %v", err)
}
if decPreprepare.Proposal.Hash() != proposal.Hash() {
t.Errorf("error mismatch proposal hash: have %v, want %v", decPreprepare.Proposal.Hash(), proposal.Hash())
}
}
func TestRCEncodeDeocdeRLP(t *testing.T) {
view := &View{
Round: big.NewInt(1),
Sequence: big.NewInt(5),
}
rc := &RoundChangeMessage{
View: view,
PreparedRound: big.NewInt(0),
PreparedBlock: makeBlock(5),
}
rawRC, err := rlp.EncodeToBytes(rc)
if err != nil {
t.Errorf("error mismatch: have %v, want nil", err)
}
// decode roundchange message
msg := &message{
Code: msgRoundChange,
Msg: rawRC,
Address: common.Address{},
}
var decRC *RoundChangeMessage
err = msg.Decode(&decRC)
if err != nil {
t.Errorf("error decoding roundchange message: %v", err)
}
if decRC.View.Round.Uint64() != view.Round.Uint64() {
t.Errorf("error mismatch view: have %v, want %v", decRC.View.Round.Uint64(), view.Round.Uint64())
}
}

40
consensus/qibft/types.go Normal file
View File

@ -0,0 +1,40 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package istanbul
import (
"io"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
)
// Proposal supports retrieving height and serialized block to be used during Istanbul consensus.
type Proposal interface {
// Number retrieves the sequence number of this proposal.
Number() *big.Int
// Hash retrieves the hash of this proposal.
Hash() common.Hash
EncodeRLP(w io.Writer) error
DecodeRLP(s *rlp.Stream) error
String() string
}

View File

@ -284,6 +284,11 @@ func CreateConsensusEngine(ctx *node.ServiceContext, chainConfig *params.ChainCo
config.Istanbul.Ceil2Nby3Block = chainConfig.Istanbul.Ceil2Nby3Block
config.Istanbul.AllowedFutureBlockTime = config.AllowedFutureBlockTime
config.Istanbul.QibftBlock = chainConfig.Istanbul.QibftBlock
if chainConfig.Istanbul.QibftBlock == nil {
config.Istanbul.QibftBlock = big.NewInt(0)
}
return istanbulBackend.New(&config.Istanbul, ctx.NodeKey(), db)
}

View File

@ -51,7 +51,7 @@ func TestNodeInfo(t *testing.T) {
}{
{"ethash", nil, nil, false},
{"raft", nil, nil, true},
{"istanbul", nil, &params.IstanbulConfig{1, 1, big.NewInt(0), 0}, false},
{"istanbul", nil, &params.IstanbulConfig{1, 1, big.NewInt(0), 0, big.NewInt(0)}, false},
{"clique", &params.CliqueConfig{1, 1, 0}, nil, false},
}

View File

@ -346,6 +346,7 @@ type IstanbulConfig struct {
ProposerPolicy uint64 `json:"policy"` // The policy for proposer selection
Ceil2Nby3Block *big.Int `json:"ceil2Nby3Block,omitempty"` // Number of confirmations required to move from one state to next [2F + 1 to Ceil(2N/3)]
AllowedFutureBlockTime uint64 `json:"allowedFutureBlockTime"` // Max time (in seconds) from current time allowed for blocks, before they're considered future blocks
QibftBlock *big.Int `json:"qibftBlock,omitempty"` // Fork block at which block confirmations are done using qibft consensus instead of ibft
}
// String implements the stringer interface, returning the consensus engine details.
@ -677,6 +678,9 @@ func (c *ChainConfig) checkCompatible(newcfg *ChainConfig, head *big.Int, isQuor
if c.Istanbul != nil && newcfg.Istanbul != nil && isForkIncompatible(c.Istanbul.Ceil2Nby3Block, newcfg.Istanbul.Ceil2Nby3Block, head) {
return newCompatError("Ceil 2N/3 fork block", c.Istanbul.Ceil2Nby3Block, newcfg.Istanbul.Ceil2Nby3Block)
}
if c.Istanbul != nil && newcfg.Istanbul != nil && isForkIncompatible(c.Istanbul.QibftBlock, newcfg.Istanbul.QibftBlock, head) {
return newCompatError("Qibft fork block", c.Istanbul.QibftBlock, newcfg.Istanbul.QibftBlock)
}
if isForkIncompatible(c.QIP714Block, newcfg.QIP714Block, head) {
return newCompatError("permissions fork block", c.QIP714Block, newcfg.QIP714Block)
}

View File

@ -175,6 +175,23 @@ func TestCheckCompatible(t *testing.T) {
RewindTo: 9,
},
},
{
stored: &ChainConfig{Istanbul: &IstanbulConfig{QibftBlock: big.NewInt(50)}},
new: &ChainConfig{Istanbul: &IstanbulConfig{QibftBlock: big.NewInt(60)}},
head: 40,
wantErr: nil,
},
{
stored: &ChainConfig{Istanbul: &IstanbulConfig{QibftBlock: big.NewInt(20)}},
new: &ChainConfig{Istanbul: &IstanbulConfig{QibftBlock: big.NewInt(30)}},
head: 20,
wantErr: &ConfigCompatError{
What: "Qibft fork block",
StoredConfig: big.NewInt(20),
NewConfig: big.NewInt(30),
RewindTo: 19,
},
},
{
stored: &ChainConfig{MaxCodeSizeChangeBlock: big.NewInt(10)},
new: &ChainConfig{MaxCodeSizeChangeBlock: big.NewInt(20)},