mirror of https://github.com/poanetwork/quorum.git
170 lines
4.4 KiB
Go
170 lines
4.4 KiB
Go
|
// Copyright 2017 The go-ethereum Authors
|
||
|
// This file is part of the go-ethereum library.
|
||
|
//
|
||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||
|
// the Free Software Foundation, either version 3 of the License, or
|
||
|
// (at your option) any later version.
|
||
|
//
|
||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
// GNU Lesser General Public License for more details.
|
||
|
//
|
||
|
// You should have received a copy of the GNU Lesser General Public License
|
||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||
|
|
||
|
package core
|
||
|
|
||
|
import (
|
||
|
"github.com/ethereum/go-ethereum/consensus/istanbul"
|
||
|
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
// msgPriority is defined for calculating processing priority to speedup consensus
|
||
|
// msgPreprepare > msgCommit > msgPrepare
|
||
|
msgPriority = map[uint64]int{
|
||
|
msgPreprepare: 1,
|
||
|
msgCommit: 2,
|
||
|
msgPrepare: 3,
|
||
|
}
|
||
|
)
|
||
|
|
||
|
// checkMessage checks the message state
|
||
|
// return errInvalidMessage if the message is invalid
|
||
|
// return errFutureMessage if the message view is larger than current view
|
||
|
// return errOldMessage if the message view is smaller than current view
|
||
|
func (c *core) checkMessage(msgCode uint64, view *istanbul.View) error {
|
||
|
if view == nil || view.Sequence == nil || view.Round == nil {
|
||
|
return errInvalidMessage
|
||
|
}
|
||
|
|
||
|
if view.Cmp(c.currentView()) > 0 {
|
||
|
return errFutureMessage
|
||
|
}
|
||
|
|
||
|
if view.Cmp(c.currentView()) < 0 {
|
||
|
return errOldMessage
|
||
|
}
|
||
|
|
||
|
if c.waitingForRoundChange {
|
||
|
return errFutureMessage
|
||
|
}
|
||
|
|
||
|
// StateAcceptRequest only accepts msgPreprepare
|
||
|
// other messages are future messages
|
||
|
if c.state == StateAcceptRequest {
|
||
|
if msgCode > msgPreprepare {
|
||
|
return errFutureMessage
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// For states(StatePreprepared, StatePrepared, StateCommitted),
|
||
|
// can accept all message types if processing with same view
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (c *core) storeBacklog(msg *message, src istanbul.Validator) {
|
||
|
logger := c.logger.New("from", src, "state", c.state)
|
||
|
|
||
|
if src.Address() == c.Address() {
|
||
|
logger.Warn("Backlog from self")
|
||
|
return
|
||
|
}
|
||
|
|
||
|
logger.Trace("Store future message")
|
||
|
|
||
|
c.backlogsMu.Lock()
|
||
|
defer c.backlogsMu.Unlock()
|
||
|
|
||
|
backlog := c.backlogs[src]
|
||
|
if backlog == nil {
|
||
|
backlog = prque.New()
|
||
|
}
|
||
|
switch msg.Code {
|
||
|
case msgPreprepare:
|
||
|
var p *istanbul.Preprepare
|
||
|
err := msg.Decode(&p)
|
||
|
if err == nil {
|
||
|
backlog.Push(msg, toPriority(msg.Code, p.View))
|
||
|
}
|
||
|
// for istanbul.MsgPrepare and istanbul.MsgCommit cases
|
||
|
default:
|
||
|
var p *istanbul.Subject
|
||
|
err := msg.Decode(&p)
|
||
|
if err == nil {
|
||
|
backlog.Push(msg, toPriority(msg.Code, p.View))
|
||
|
}
|
||
|
}
|
||
|
c.backlogs[src] = backlog
|
||
|
}
|
||
|
|
||
|
func (c *core) processBacklog() {
|
||
|
c.backlogsMu.Lock()
|
||
|
defer c.backlogsMu.Unlock()
|
||
|
|
||
|
for src, backlog := range c.backlogs {
|
||
|
if backlog == nil {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
logger := c.logger.New("from", src, "state", c.state)
|
||
|
isFuture := false
|
||
|
|
||
|
// We stop processing if
|
||
|
// 1. backlog is empty
|
||
|
// 2. The first message in queue is a future message
|
||
|
for !(backlog.Empty() || isFuture) {
|
||
|
m, prio := backlog.Pop()
|
||
|
msg := m.(*message)
|
||
|
var view *istanbul.View
|
||
|
switch msg.Code {
|
||
|
case msgPreprepare:
|
||
|
var m *istanbul.Preprepare
|
||
|
err := msg.Decode(&m)
|
||
|
if err == nil {
|
||
|
view = m.View
|
||
|
}
|
||
|
// for istanbul.MsgPrepare and istanbul.MsgCommit cases
|
||
|
default:
|
||
|
var sub *istanbul.Subject
|
||
|
err := msg.Decode(&sub)
|
||
|
if err == nil {
|
||
|
view = sub.View
|
||
|
}
|
||
|
}
|
||
|
if view == nil {
|
||
|
logger.Debug("Nil view", "msg", msg)
|
||
|
continue
|
||
|
}
|
||
|
// Push back if it's a future message
|
||
|
err := c.checkMessage(msg.Code, view)
|
||
|
if err != nil {
|
||
|
if err == errFutureMessage {
|
||
|
logger.Trace("Stop processing backlog", "msg", msg)
|
||
|
backlog.Push(msg, prio)
|
||
|
isFuture = true
|
||
|
break
|
||
|
}
|
||
|
logger.Trace("Skip the backlog event", "msg", msg, "err", err)
|
||
|
continue
|
||
|
}
|
||
|
logger.Trace("Post backlog event", "msg", msg)
|
||
|
|
||
|
go c.sendEvent(backlogEvent{
|
||
|
src: src,
|
||
|
msg: msg,
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func toPriority(msgCode uint64, view *istanbul.View) float32 {
|
||
|
// FIXME: round will be reset as 0 while new sequence
|
||
|
// 10 * Round limits the range of message code is from 0 to 9
|
||
|
// 1000 * Sequence limits the range of round is from 0 to 99
|
||
|
return -float32(view.Sequence.Uint64()*1000 + view.Round.Uint64()*10 + uint64(msgPriority[msgCode]))
|
||
|
}
|