mirror of https://github.com/poanetwork/quorum.git
309 lines
9.4 KiB
Go
309 lines
9.4 KiB
Go
|
// Copyright 2017 The go-ethereum Authors
|
||
|
// This file is part of the go-ethereum library.
|
||
|
//
|
||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||
|
// the Free Software Foundation, either version 3 of the License, or
|
||
|
// (at your option) any later version.
|
||
|
//
|
||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
// GNU Lesser General Public License for more details.
|
||
|
//
|
||
|
// You should have received a copy of the GNU Lesser General Public License
|
||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||
|
|
||
|
package core
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"math"
|
||
|
"math/big"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/ethereum/go-ethereum/common"
|
||
|
"github.com/ethereum/go-ethereum/consensus/istanbul"
|
||
|
"github.com/ethereum/go-ethereum/core/types"
|
||
|
"github.com/ethereum/go-ethereum/event"
|
||
|
"github.com/ethereum/go-ethereum/log"
|
||
|
metrics "github.com/ethereum/go-ethereum/metrics"
|
||
|
goMetrics "github.com/rcrowley/go-metrics"
|
||
|
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
|
||
|
)
|
||
|
|
||
|
// New creates an Istanbul consensus core
|
||
|
func New(backend istanbul.Backend, config *istanbul.Config) Engine {
|
||
|
c := &core{
|
||
|
config: config,
|
||
|
address: backend.Address(),
|
||
|
state: StateAcceptRequest,
|
||
|
logger: log.New("address", backend.Address()),
|
||
|
backend: backend,
|
||
|
backlogs: make(map[istanbul.Validator]*prque.Prque),
|
||
|
backlogsMu: new(sync.Mutex),
|
||
|
pendingRequests: prque.New(),
|
||
|
pendingRequestsMu: new(sync.Mutex),
|
||
|
consensusTimestamp: time.Time{},
|
||
|
roundMeter: metrics.NewMeter("consensus/istanbul/core/round"),
|
||
|
sequenceMeter: metrics.NewMeter("consensus/istanbul/core/sequence"),
|
||
|
consensusTimer: metrics.NewTimer("consensus/istanbul/core/consensus"),
|
||
|
}
|
||
|
c.validateFn = c.checkValidatorSignature
|
||
|
return c
|
||
|
}
|
||
|
|
||
|
// ----------------------------------------------------------------------------
|
||
|
|
||
|
type core struct {
|
||
|
config *istanbul.Config
|
||
|
address common.Address
|
||
|
state State
|
||
|
logger log.Logger
|
||
|
|
||
|
backend istanbul.Backend
|
||
|
events *event.TypeMuxSubscription
|
||
|
finalCommittedSub *event.TypeMuxSubscription
|
||
|
timeoutSub *event.TypeMuxSubscription
|
||
|
futurePreprepareTimer *time.Timer
|
||
|
|
||
|
lastProposer common.Address
|
||
|
lastProposal istanbul.Proposal
|
||
|
valSet istanbul.ValidatorSet
|
||
|
waitingForRoundChange bool
|
||
|
validateFn func([]byte, []byte) (common.Address, error)
|
||
|
|
||
|
backlogs map[istanbul.Validator]*prque.Prque
|
||
|
backlogsMu *sync.Mutex
|
||
|
|
||
|
current *roundState
|
||
|
|
||
|
roundChangeSet *roundChangeSet
|
||
|
roundChangeTimer *time.Timer
|
||
|
|
||
|
pendingRequests *prque.Prque
|
||
|
pendingRequestsMu *sync.Mutex
|
||
|
|
||
|
consensusTimestamp time.Time
|
||
|
// the meter to record the round change rate
|
||
|
roundMeter goMetrics.Meter
|
||
|
// the meter to record the sequence update rate
|
||
|
sequenceMeter goMetrics.Meter
|
||
|
// the timer to record consensus duration (from accepting a preprepare to final committed stage)
|
||
|
consensusTimer goMetrics.Timer
|
||
|
}
|
||
|
|
||
|
func (c *core) finalizeMessage(msg *message) ([]byte, error) {
|
||
|
var err error
|
||
|
// Add sender address
|
||
|
msg.Address = c.Address()
|
||
|
|
||
|
// Add proof of consensus
|
||
|
msg.CommittedSeal = []byte{}
|
||
|
// Assign the CommittedSeal if it's a COMMIT message and proposal is not nil
|
||
|
if msg.Code == msgCommit && c.current.Proposal() != nil {
|
||
|
seal := PrepareCommittedSeal(c.current.Proposal().Hash())
|
||
|
msg.CommittedSeal, err = c.backend.Sign(seal)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Sign message
|
||
|
data, err := msg.PayloadNoSig()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
msg.Signature, err = c.backend.Sign(data)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
// Convert to payload
|
||
|
payload, err := msg.Payload()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return payload, nil
|
||
|
}
|
||
|
|
||
|
func (c *core) broadcast(msg *message) {
|
||
|
logger := c.logger.New("state", c.state)
|
||
|
|
||
|
payload, err := c.finalizeMessage(msg)
|
||
|
if err != nil {
|
||
|
logger.Error("Failed to finalize message", "msg", msg, "err", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Broadcast payload
|
||
|
if err = c.backend.Broadcast(c.valSet, payload); err != nil {
|
||
|
logger.Error("Failed to broadcast message", "msg", msg, "err", err)
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *core) currentView() *istanbul.View {
|
||
|
return &istanbul.View{
|
||
|
Sequence: new(big.Int).Set(c.current.Sequence()),
|
||
|
Round: new(big.Int).Set(c.current.Round()),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *core) isProposer() bool {
|
||
|
v := c.valSet
|
||
|
if v == nil {
|
||
|
return false
|
||
|
}
|
||
|
return v.IsProposer(c.backend.Address())
|
||
|
}
|
||
|
|
||
|
func (c *core) commit() {
|
||
|
c.setState(StateCommitted)
|
||
|
|
||
|
proposal := c.current.Proposal()
|
||
|
if proposal != nil {
|
||
|
committedSeals := make([][]byte, c.current.Commits.Size())
|
||
|
for i, v := range c.current.Commits.Values() {
|
||
|
committedSeals[i] = make([]byte, types.IstanbulExtraSeal)
|
||
|
copy(committedSeals[i][:], v.CommittedSeal[:])
|
||
|
}
|
||
|
|
||
|
if err := c.backend.Commit(proposal, committedSeals); err != nil {
|
||
|
c.current.UnlockHash() //Unlock block when insertion fails
|
||
|
c.sendNextRoundChange()
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *core) startNewRound(newView *istanbul.View, lastProposal istanbul.Proposal, lastProposer common.Address, roundChange bool) {
|
||
|
var logger log.Logger
|
||
|
if c.current == nil {
|
||
|
logger = c.logger.New("old_round", -1, "old_seq", 0, "old_proposer", c.valSet.GetProposer())
|
||
|
} else {
|
||
|
logger = c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence(), "old_proposer", c.valSet.GetProposer())
|
||
|
}
|
||
|
|
||
|
// Try to get last proposal
|
||
|
if lastProposal == nil {
|
||
|
lastProposal, lastProposer = c.backend.LastProposal()
|
||
|
if lastProposal.Number().Cmp(newView.Sequence) > 0 {
|
||
|
newView = &istanbul.View{
|
||
|
Sequence: new(big.Int).Add(lastProposal.Number(), common.Big1),
|
||
|
Round: new(big.Int),
|
||
|
}
|
||
|
c.lastProposal = lastProposal
|
||
|
c.lastProposer = lastProposer
|
||
|
logger.Trace("Catch up latest proposal", "number", lastProposal.Number().Uint64(), "hash", lastProposal.Hash())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
c.valSet = c.backend.Validators(c.lastProposal)
|
||
|
// Clear invalid ROUND CHANGE messages
|
||
|
c.roundChangeSet = newRoundChangeSet(c.valSet)
|
||
|
// New snapshot for new round
|
||
|
c.updateRoundState(newView, c.valSet, roundChange)
|
||
|
// Calculate new proposer
|
||
|
c.valSet.CalcProposer(c.lastProposer, newView.Round.Uint64())
|
||
|
c.waitingForRoundChange = false
|
||
|
c.setState(StateAcceptRequest)
|
||
|
if roundChange && c.isProposer() && c.current != nil {
|
||
|
// If it is locked, propose the old proposal
|
||
|
// If we have pending request, propose pending request
|
||
|
if c.current.IsHashLocked() {
|
||
|
r := &istanbul.Request{
|
||
|
Proposal: c.current.Proposal(), //c.current.Proposal would be the locked proposal by previous proposer, see updateRoundState
|
||
|
}
|
||
|
c.sendPreprepare(r)
|
||
|
} else if c.current.pendingRequest != nil {
|
||
|
c.sendPreprepare(c.current.pendingRequest)
|
||
|
}
|
||
|
}
|
||
|
c.newRoundChangeTimer()
|
||
|
|
||
|
logger.Debug("New round", "new_round", newView.Round, "new_seq", newView.Sequence, "new_proposer", c.valSet.GetProposer(), "valSet", c.valSet.List(), "size", c.valSet.Size(), "isProposer", c.isProposer())
|
||
|
}
|
||
|
|
||
|
func (c *core) catchUpRound(view *istanbul.View) {
|
||
|
logger := c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence(), "old_proposer", c.valSet.GetProposer())
|
||
|
|
||
|
if view.Round.Cmp(c.current.Round()) > 0 {
|
||
|
c.roundMeter.Mark(new(big.Int).Sub(view.Round, c.current.Round()).Int64())
|
||
|
}
|
||
|
c.waitingForRoundChange = true
|
||
|
|
||
|
// Need to keep block locked for round catching up
|
||
|
c.updateRoundState(view, c.valSet, true)
|
||
|
c.roundChangeSet.Clear(view.Round)
|
||
|
c.newRoundChangeTimer()
|
||
|
|
||
|
logger.Trace("Catch up round", "new_round", view.Round, "new_seq", view.Sequence, "new_proposer", c.valSet)
|
||
|
}
|
||
|
|
||
|
// updateRoundState updates round state by checking if locking block is necessary
|
||
|
func (c *core) updateRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, roundChange bool) {
|
||
|
// Lock only if both roundChange is true and it is locked
|
||
|
if roundChange && c.current != nil {
|
||
|
if c.current.IsHashLocked() {
|
||
|
c.current = newRoundState(view, validatorSet, c.current.GetLockedHash(), c.current.Preprepare, c.current.pendingRequest)
|
||
|
} else {
|
||
|
c.current = newRoundState(view, validatorSet, common.Hash{}, nil, c.current.pendingRequest)
|
||
|
}
|
||
|
} else {
|
||
|
c.current = newRoundState(view, validatorSet, common.Hash{}, nil, nil)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *core) setState(state State) {
|
||
|
if c.state != state {
|
||
|
c.state = state
|
||
|
}
|
||
|
if state == StateAcceptRequest {
|
||
|
c.processPendingRequests()
|
||
|
}
|
||
|
c.processBacklog()
|
||
|
}
|
||
|
|
||
|
func (c *core) Address() common.Address {
|
||
|
return c.address
|
||
|
}
|
||
|
|
||
|
func (c *core) stopFuturePreprepareTimer() {
|
||
|
if c.futurePreprepareTimer != nil {
|
||
|
c.futurePreprepareTimer.Stop()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *core) stopTimer() {
|
||
|
c.stopFuturePreprepareTimer()
|
||
|
if c.roundChangeTimer != nil {
|
||
|
c.roundChangeTimer.Stop()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *core) newRoundChangeTimer() {
|
||
|
c.stopTimer()
|
||
|
|
||
|
// set timeout based on the round number
|
||
|
t := uint64(math.Pow(2, float64(c.current.Round().Uint64()))) * c.config.RequestTimeout
|
||
|
timeout := time.Duration(t) * time.Millisecond
|
||
|
c.roundChangeTimer = time.AfterFunc(timeout, func() {
|
||
|
c.sendEvent(timeoutEvent{})
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func (c *core) checkValidatorSignature(data []byte, sig []byte) (common.Address, error) {
|
||
|
return istanbul.CheckValidatorSignature(c.valSet, data, sig)
|
||
|
}
|
||
|
|
||
|
// PrepareCommittedSeal returns a committed seal for the given hash
|
||
|
func PrepareCommittedSeal(hash common.Hash) []byte {
|
||
|
var buf bytes.Buffer
|
||
|
buf.Write(hash.Bytes())
|
||
|
buf.Write([]byte{byte(msgCommit)})
|
||
|
return buf.Bytes()
|
||
|
}
|