tendermint/peer.go

142 lines
3.0 KiB
Go
Raw Normal View History

2015-10-25 18:21:51 -07:00
package p2p
import (
"fmt"
"io"
"net"
. "github.com/tendermint/go-common"
2016-05-08 14:59:27 -07:00
cfg "github.com/tendermint/go-config"
2015-10-25 18:21:51 -07:00
"github.com/tendermint/go-wire"
)
type Peer struct {
BaseService
outbound bool
mconn *MConnection
*NodeInfo
Key string
Data *CMap // User data.
}
// NOTE: blocking
// Before creating a peer with newPeer(), perform a handshake on connection.
func peerHandshake(conn net.Conn, ourNodeInfo *NodeInfo) (*NodeInfo, error) {
var peerNodeInfo = new(NodeInfo)
var err1 error
var err2 error
Parallel(
func() {
2015-11-10 12:29:43 -08:00
var n int
2015-10-25 18:21:51 -07:00
wire.WriteBinary(ourNodeInfo, conn, &n, &err1)
},
func() {
2015-11-10 12:29:43 -08:00
var n int
wire.ReadBinary(peerNodeInfo, conn, maxNodeInfoSize, &n, &err2)
2015-10-25 18:21:51 -07:00
log.Notice("Peer handshake", "peerNodeInfo", peerNodeInfo)
})
if err1 != nil {
return nil, err1
}
if err2 != nil {
return nil, err2
}
peerNodeInfo.RemoteAddr = conn.RemoteAddr().String()
2015-10-25 18:21:51 -07:00
return peerNodeInfo, nil
}
// NOTE: call peerHandshake on conn before calling newPeer().
2016-05-08 14:59:27 -07:00
func newPeer(config cfg.Config, conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
2015-10-25 18:21:51 -07:00
var p *Peer
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
PanicSanity(Fmt("Unknown channel %X", chID))
}
reactor.Receive(chID, p, msgBytes)
}
onError := func(r interface{}) {
p.Stop()
onPeerError(p, r)
}
2017-04-06 05:43:45 -07:00
mconnConfig := &MConnectionConfig{
SendRate: int64(config.GetInt(configKeySendRate)),
RecvRate: int64(config.GetInt(configKeyRecvRate)),
}
mconn := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, mconnConfig)
2015-10-25 18:21:51 -07:00
p = &Peer{
outbound: outbound,
mconn: mconn,
NodeInfo: peerNodeInfo,
Key: peerNodeInfo.PubKey.KeyString(),
Data: NewCMap(),
}
p.BaseService = *NewBaseService(log, "Peer", p)
return p
}
func (p *Peer) OnStart() error {
p.BaseService.OnStart()
_, err := p.mconn.Start()
return err
}
func (p *Peer) OnStop() {
p.BaseService.OnStop()
p.mconn.Stop()
}
func (p *Peer) Connection() *MConnection {
return p.mconn
}
func (p *Peer) IsOutbound() bool {
return p.outbound
}
func (p *Peer) Send(chID byte, msg interface{}) bool {
if !p.IsRunning() {
return false
}
return p.mconn.Send(chID, msg)
}
func (p *Peer) TrySend(chID byte, msg interface{}) bool {
if !p.IsRunning() {
return false
}
return p.mconn.TrySend(chID, msg)
}
func (p *Peer) CanSend(chID byte) bool {
if !p.IsRunning() {
return false
}
return p.mconn.CanSend(chID)
}
func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
2015-11-10 12:29:43 -08:00
var n_ int
wire.WriteString(p.Key, w, &n_, &err)
n += int64(n_)
2015-10-25 18:21:51 -07:00
return
}
func (p *Peer) String() string {
if p.outbound {
return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
} else {
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
}
}
func (p *Peer) Equals(other *Peer) bool {
return p.Key == other.Key
}
func (p *Peer) Get(key string) interface{} {
return p.Data.Get(key)
}