tendermint/evidence/reactor.go

221 lines
6.3 KiB
Go
Raw Normal View History

2017-11-19 20:22:25 -08:00
package evidence
2017-11-02 11:06:48 -07:00
import (
"fmt"
"reflect"
2017-11-02 17:26:07 -07:00
"time"
2017-11-02 11:06:48 -07:00
amino "github.com/tendermint/go-amino"
2018-07-01 19:36:49 -07:00
clist "github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
2017-11-02 11:06:48 -07:00
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
const (
EvidenceChannel = byte(0x38)
2017-11-02 11:06:48 -07:00
2018-06-04 17:38:44 -07:00
maxMsgSize = 1048576 // 1MB TODO make it configurable
broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
2017-11-02 11:06:48 -07:00
)
// EvidenceReactor handles evpool evidence broadcasting amongst peers.
type EvidenceReactor struct {
2017-11-02 11:06:48 -07:00
p2p.BaseReactor
evpool *EvidencePool
eventBus *types.EventBus
2017-11-02 11:06:48 -07:00
}
// NewEvidenceReactor returns a new EvidenceReactor with the given config and evpool.
2017-12-26 17:34:57 -08:00
func NewEvidenceReactor(evpool *EvidencePool) *EvidenceReactor {
evR := &EvidenceReactor{
2017-11-02 17:26:07 -07:00
evpool: evpool,
2017-11-02 11:06:48 -07:00
}
evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR)
2017-11-02 11:06:48 -07:00
return evR
}
// SetLogger sets the Logger on the reactor and the underlying Evidence.
func (evR *EvidenceReactor) SetLogger(l log.Logger) {
2017-11-02 11:06:48 -07:00
evR.Logger = l
2017-11-02 17:26:07 -07:00
evR.evpool.SetLogger(l)
}
2017-11-02 11:06:48 -07:00
// GetChannels implements Reactor.
// It returns the list of channels for this reactor.
func (evR *EvidenceReactor) GetChannels() []*p2p.ChannelDescriptor {
2017-11-02 11:06:48 -07:00
return []*p2p.ChannelDescriptor{
&p2p.ChannelDescriptor{
ID: EvidenceChannel,
2017-11-02 11:06:48 -07:00
Priority: 5,
},
}
}
// AddPeer implements Reactor.
func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) {
2018-06-04 17:38:44 -07:00
go evR.broadcastEvidenceRoutine(peer)
2017-11-02 11:06:48 -07:00
}
// RemovePeer implements Reactor.
func (evR *EvidenceReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// nothing to do
2017-11-02 11:06:48 -07:00
}
// Receive implements Reactor.
// It adds any received evidence to the evpool.
func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := decodeMsg(msgBytes)
2017-11-02 11:06:48 -07:00
if err != nil {
2018-03-04 01:42:45 -08:00
evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
evR.Switch.StopPeerForError(src, err)
2017-11-02 11:06:48 -07:00
return
}
evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
switch msg := msg.(type) {
case *EvidenceListMessage:
2017-11-02 11:06:48 -07:00
for _, ev := range msg.Evidence {
2017-11-02 17:26:07 -07:00
err := evR.evpool.AddEvidence(ev)
2017-11-02 11:06:48 -07:00
if err != nil {
evR.Logger.Info("Evidence is not valid", "evidence", msg.Evidence, "err", err)
2018-03-04 02:22:58 -08:00
// punish peer
evR.Switch.StopPeerForError(src, err)
2017-11-02 11:06:48 -07:00
}
}
default:
evR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
}
// SetEventSwitch implements events.Eventable.
func (evR *EvidenceReactor) SetEventBus(b *types.EventBus) {
evR.eventBus = b
2017-11-02 11:06:48 -07:00
}
2018-06-04 17:38:44 -07:00
// Modeled after the mempool routine.
// - Evidence accumulates in a clist.
// - Each peer has a routien that iterates through the clist,
// sending available evidence to the peer.
// - If we're waiting for new evidence and the list is not empty,
// start iterating from the beginning again.
func (evR *EvidenceReactor) broadcastEvidenceRoutine(peer p2p.Peer) {
var next *clist.CElement
2017-11-02 17:26:07 -07:00
for {
2018-06-04 17:38:44 -07:00
// This happens because the CElement we were looking at got garbage
// collected (removed). That is, .NextWait() returned nil. Go ahead and
// start from the beginning.
if next == nil {
select {
case <-evR.evpool.EvidenceWaitChan(): // Wait until evidence is available
if next = evR.evpool.EvidenceFront(); next == nil {
continue
}
case <-peer.Quit():
return
case <-evR.Quit():
return
}
}
2018-06-04 17:38:44 -07:00
ev := next.Value.(types.Evidence)
2018-06-04 21:50:42 -07:00
msg, retry := evR.checkSendEvidenceMessage(peer, ev)
if msg != nil {
success := peer.Send(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
retry = !success
2018-06-04 17:38:44 -07:00
}
2018-06-04 21:50:42 -07:00
if retry {
2018-06-04 17:38:44 -07:00
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
}
afterCh := time.After(time.Second * broadcastEvidenceIntervalS)
select {
case <-afterCh:
// start from the beginning every tick.
// TODO: only do this if we're at the end of the list!
next = nil
case <-next.NextWaitChan():
// see the start of the for loop for nil check
next = next.Next()
case <-peer.Quit():
return
case <-evR.Quit():
return
}
}
}
2018-06-04 21:50:42 -07:00
// Returns the message to send the peer, or nil if the evidence is invalid for the peer.
// If message is nil, return true if we should sleep and try again.
func (evR EvidenceReactor) checkSendEvidenceMessage(peer p2p.Peer, ev types.Evidence) (msg EvidenceMessage, retry bool) {
// make sure the peer is up to date
evHeight := ev.Height()
peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
if !ok {
evR.Logger.Info("Found peer without PeerState", "peer", peer)
return nil, true
}
// NOTE: We only send evidence to peers where
// peerHeight - maxAge < evidenceHeight < peerHeight
maxAge := evR.evpool.State().ConsensusParams.EvidenceParams.MaxAge
peerHeight := peerState.GetHeight()
if peerHeight < evHeight {
// peer is behind. sleep while he catches up
return nil, true
} else if peerHeight > evHeight+maxAge {
// evidence is too old, skip
// NOTE: if evidence is too old for an honest peer,
// then we're behind and either it already got committed or it never will!
evR.Logger.Info("Not sending peer old evidence", "peerHeight", peerHeight, "evHeight", evHeight, "maxAge", maxAge, "peer", peer)
return nil, false
}
// send evidence
msg = &EvidenceListMessage{[]types.Evidence{ev}}
return msg, false
}
2018-06-04 17:38:44 -07:00
// PeerState describes the state of a peer.
type PeerState interface {
2018-06-04 17:38:44 -07:00
GetHeight() int64
}
2017-11-02 11:06:48 -07:00
//-----------------------------------------------------------------------------
// Messages
// EvidenceMessage is a message sent or received by the EvidenceReactor.
type EvidenceMessage interface{}
2017-11-02 11:06:48 -07:00
2018-04-05 05:43:23 -07:00
func RegisterEvidenceMessages(cdc *amino.Codec) {
cdc.RegisterInterface((*EvidenceMessage)(nil), nil)
cdc.RegisterConcrete(&EvidenceListMessage{},
2018-04-06 13:46:40 -07:00
"tendermint/evidence/EvidenceListMessage", nil)
2018-04-05 05:43:23 -07:00
}
2017-11-02 11:06:48 -07:00
func decodeMsg(bz []byte) (msg EvidenceMessage, err error) {
2018-04-09 05:14:33 -07:00
if len(bz) > maxMsgSize {
return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize)
2018-04-09 05:14:33 -07:00
}
2018-04-05 05:43:23 -07:00
err = cdc.UnmarshalBinaryBare(bz, &msg)
2017-11-02 11:06:48 -07:00
return
}
//-------------------------------------
// EvidenceMessage contains a list of evidence.
type EvidenceListMessage struct {
2017-11-02 17:26:07 -07:00
Evidence []types.Evidence
2017-11-02 11:06:48 -07:00
}
// String returns a string representation of the EvidenceListMessage.
func (m *EvidenceListMessage) String() string {
return fmt.Sprintf("[EvidenceListMessage %v]", m.Evidence)
2017-11-02 11:06:48 -07:00
}