tendermint/p2p/peer.go

110 lines
2.1 KiB
Go
Raw Normal View History

2014-07-07 20:03:50 -07:00
package p2p
2014-06-18 20:48:32 -07:00
import (
2014-07-01 14:50:24 -07:00
"fmt"
"io"
"net"
2014-07-01 14:50:24 -07:00
"sync/atomic"
"github.com/tendermint/tendermint/binary"
2014-09-14 15:37:32 -07:00
. "github.com/tendermint/tendermint/common"
2014-06-18 20:48:32 -07:00
)
2014-06-24 17:28:40 -07:00
type Peer struct {
2014-07-10 22:14:23 -07:00
outbound bool
mconn *MConnection
started uint32
stopped uint32
2014-09-14 15:37:32 -07:00
Key string
Data *CMap // User data.
2014-06-18 20:48:32 -07:00
}
2014-09-14 15:37:32 -07:00
func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
var p *Peer
2014-09-14 15:37:32 -07:00
onReceive := func(chId byte, msgBytes []byte) {
reactor := reactorsByCh[chId]
if reactor == nil {
2014-12-29 18:09:06 -08:00
panic(Fmt("Unknown channel %X", chId))
2014-09-14 15:37:32 -07:00
}
reactor.Receive(chId, p, msgBytes)
}
onError := func(r interface{}) {
p.stop()
onPeerError(p, r)
}
2014-09-14 15:37:32 -07:00
mconn := NewMConnection(conn, chDescs, onReceive, onError)
p = &Peer{
outbound: outbound,
mconn: mconn,
stopped: 0,
Key: mconn.RemoteAddress.String(),
2014-09-14 15:37:32 -07:00
Data: NewCMap(),
2014-07-01 14:50:24 -07:00
}
return p
2014-06-28 13:09:04 -07:00
}
func (p *Peer) start() {
if atomic.CompareAndSwapUint32(&p.started, 0, 1) {
2014-12-29 18:39:19 -08:00
log.Debug("Starting Peer", "peer", p)
p.mconn.Start()
2014-07-01 14:50:24 -07:00
}
2014-06-18 20:48:32 -07:00
}
func (p *Peer) stop() {
2014-07-01 14:50:24 -07:00
if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) {
2014-12-29 18:39:19 -08:00
log.Debug("Stopping Peer", "peer", p)
p.mconn.Stop()
2014-07-01 14:50:24 -07:00
}
2014-06-18 20:48:32 -07:00
}
2014-09-14 15:37:32 -07:00
func (p *Peer) IsStopped() bool {
return atomic.LoadUint32(&p.stopped) == 1
}
2014-12-29 15:14:54 -08:00
func (p *Peer) Connection() *MConnection {
return p.mconn
2014-09-14 15:37:32 -07:00
}
2014-07-10 22:14:23 -07:00
func (p *Peer) IsOutbound() bool {
return p.outbound
2014-07-09 18:33:44 -07:00
}
func (p *Peer) Send(chId byte, msg interface{}) bool {
if atomic.LoadUint32(&p.stopped) == 1 {
return false
}
2014-08-31 01:48:40 -07:00
return p.mconn.Send(chId, msg)
}
func (p *Peer) TrySend(chId byte, msg interface{}) bool {
if atomic.LoadUint32(&p.stopped) == 1 {
2014-07-01 14:50:24 -07:00
return false
}
2014-08-31 01:48:40 -07:00
return p.mconn.TrySend(chId, msg)
2014-06-18 20:48:32 -07:00
}
2014-08-10 16:35:08 -07:00
func (p *Peer) CanSend(chId byte) bool {
if atomic.LoadUint32(&p.stopped) == 1 {
2014-08-10 16:35:08 -07:00
return false
}
return p.mconn.CanSend(chId)
}
2014-06-24 17:28:40 -07:00
func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
binary.WriteString(p.Key, w, &n, &err)
return
2014-06-18 20:48:32 -07:00
}
2014-06-29 00:35:16 -07:00
func (p *Peer) String() string {
2014-07-17 00:54:48 -07:00
if p.outbound {
2014-10-25 14:27:53 -07:00
return fmt.Sprintf("Peer{->%v}", p.mconn)
2014-07-17 00:54:48 -07:00
} else {
2014-10-25 14:27:53 -07:00
return fmt.Sprintf("Peer{%v->}", p.mconn)
2014-07-01 14:50:24 -07:00
}
2014-06-18 20:48:32 -07:00
}
func (p *Peer) Equals(other *Peer) bool {
return p.Key == other.Key
}