tendermint/evidence/reactor.go

170 lines
4.7 KiB
Go
Raw Normal View History

2017-11-19 20:22:25 -08:00
package evidence
2017-11-02 11:06:48 -07:00
import (
"bytes"
"fmt"
"reflect"
2017-11-02 17:26:07 -07:00
"time"
2017-11-02 11:06:48 -07:00
wire "github.com/tendermint/go-wire"
"github.com/tendermint/tmlibs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
const (
EvidenceChannel = byte(0x38)
2017-11-02 11:06:48 -07:00
maxEvidenceMessageSize = 1048576 // 1MB TODO make it configurable
2017-11-02 17:26:07 -07:00
broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often
2017-11-02 11:06:48 -07:00
)
// EvidenceReactor handles evpool evidence broadcasting amongst peers.
type EvidenceReactor struct {
2017-11-02 11:06:48 -07:00
p2p.BaseReactor
evpool *EvidencePool
eventBus *types.EventBus
2017-11-02 11:06:48 -07:00
}
// NewEvidenceReactor returns a new EvidenceReactor with the given config and evpool.
2017-12-26 17:34:57 -08:00
func NewEvidenceReactor(evpool *EvidencePool) *EvidenceReactor {
evR := &EvidenceReactor{
2017-11-02 17:26:07 -07:00
evpool: evpool,
2017-11-02 11:06:48 -07:00
}
evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR)
2017-11-02 11:06:48 -07:00
return evR
}
// SetLogger sets the Logger on the reactor and the underlying Evidence.
func (evR *EvidenceReactor) SetLogger(l log.Logger) {
2017-11-02 11:06:48 -07:00
evR.Logger = l
2017-11-02 17:26:07 -07:00
evR.evpool.SetLogger(l)
}
// OnStart implements cmn.Service
func (evR *EvidenceReactor) OnStart() error {
2017-11-02 17:26:07 -07:00
if err := evR.BaseReactor.OnStart(); err != nil {
return err
}
go evR.broadcastRoutine()
return nil
2017-11-02 11:06:48 -07:00
}
// GetChannels implements Reactor.
// It returns the list of channels for this reactor.
func (evR *EvidenceReactor) GetChannels() []*p2p.ChannelDescriptor {
2017-11-02 11:06:48 -07:00
return []*p2p.ChannelDescriptor{
&p2p.ChannelDescriptor{
ID: EvidenceChannel,
2017-11-02 11:06:48 -07:00
Priority: 5,
},
}
}
// AddPeer implements Reactor.
func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) {
// send the peer our high-priority evidence.
// the rest will be sent by the broadcastRoutine
2017-12-26 22:27:03 -08:00
evidences := evR.evpool.PriorityEvidence()
msg := &EvidenceListMessage{evidences}
success := peer.Send(EvidenceChannel, struct{ EvidenceMessage }{msg})
2017-11-02 11:06:48 -07:00
if !success {
// TODO: remove peer ?
}
}
// RemovePeer implements Reactor.
func (evR *EvidenceReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// nothing to do
2017-11-02 11:06:48 -07:00
}
// Receive implements Reactor.
// It adds any received evidence to the evpool.
func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
2017-11-02 11:06:48 -07:00
_, msg, err := DecodeMessage(msgBytes)
if err != nil {
evR.Logger.Error("Error decoding message", "err", err)
return
}
evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
switch msg := msg.(type) {
case *EvidenceListMessage:
2017-11-02 11:06:48 -07:00
for _, ev := range msg.Evidence {
2017-11-02 17:26:07 -07:00
err := evR.evpool.AddEvidence(ev)
2017-11-02 11:06:48 -07:00
if err != nil {
evR.Logger.Info("Evidence is not valid", "evidence", msg.Evidence, "err", err)
// TODO: punish peer
}
}
default:
evR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
}
// SetEventSwitch implements events.Eventable.
func (evR *EvidenceReactor) SetEventBus(b *types.EventBus) {
evR.eventBus = b
2017-11-02 11:06:48 -07:00
}
2017-12-26 22:27:03 -08:00
// Broadcast new evidence to all peers.
// Broadcasts must be non-blocking so routine is always available to read off EvidenceChan.
func (evR *EvidenceReactor) broadcastRoutine() {
2017-11-02 17:26:07 -07:00
ticker := time.NewTicker(time.Second * broadcastEvidenceIntervalS)
for {
select {
2017-11-18 16:57:55 -08:00
case evidence := <-evR.evpool.EvidenceChan():
2017-11-02 17:26:07 -07:00
// broadcast some new evidence
2017-11-19 20:32:53 -08:00
msg := &EvidenceListMessage{[]types.Evidence{evidence}}
evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg})
2017-11-02 17:26:07 -07:00
// TODO: Broadcast runs asynchronously, so this should wait on the successChan
2017-11-02 17:26:07 -07:00
// in another routine before marking to be proper.
evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence)
2017-11-02 17:26:07 -07:00
case <-ticker.C:
// broadcast all pending evidence
2017-11-19 20:32:53 -08:00
msg := &EvidenceListMessage{evR.evpool.PendingEvidence()}
evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg})
case <-evR.Quit():
2017-11-02 17:26:07 -07:00
return
}
}
}
2017-11-02 11:06:48 -07:00
//-----------------------------------------------------------------------------
// Messages
const (
msgTypeEvidence = byte(0x01)
)
// EvidenceMessage is a message sent or received by the EvidenceReactor.
type EvidenceMessage interface{}
2017-11-02 11:06:48 -07:00
var _ = wire.RegisterInterface(
struct{ EvidenceMessage }{},
wire.ConcreteType{&EvidenceListMessage{}, msgTypeEvidence},
2017-11-02 11:06:48 -07:00
)
// DecodeMessage decodes a byte-array into a EvidenceMessage.
func DecodeMessage(bz []byte) (msgType byte, msg EvidenceMessage, err error) {
2017-11-02 11:06:48 -07:00
msgType = bz[0]
n := new(int)
r := bytes.NewReader(bz)
msg = wire.ReadBinary(struct{ EvidenceMessage }{}, r, maxEvidenceMessageSize, n, &err).(struct{ EvidenceMessage }).EvidenceMessage
2017-11-02 11:06:48 -07:00
return
}
//-------------------------------------
// EvidenceMessage contains a list of evidence.
type EvidenceListMessage struct {
2017-11-02 17:26:07 -07:00
Evidence []types.Evidence
2017-11-02 11:06:48 -07:00
}
// String returns a string representation of the EvidenceListMessage.
func (m *EvidenceListMessage) String() string {
return fmt.Sprintf("[EvidenceListMessage %v]", m.Evidence)
2017-11-02 11:06:48 -07:00
}