tendermint/p2p/peer.go

270 lines
5.6 KiB
Go
Raw Normal View History

2014-07-07 20:03:50 -07:00
package p2p
2014-06-18 20:48:32 -07:00
import (
"bytes"
2014-07-01 14:50:24 -07:00
"fmt"
"io"
"sync/atomic"
"time"
. "github.com/tendermint/tendermint/binary"
2014-06-18 20:48:32 -07:00
)
2014-06-24 17:28:40 -07:00
/* Peer */
2014-06-18 20:48:32 -07:00
2014-06-24 17:28:40 -07:00
type Peer struct {
2014-07-10 22:14:23 -07:00
outbound bool
2014-07-01 14:50:24 -07:00
conn *Connection
channels map[string]*Channel
quit chan struct{}
started uint32
stopped uint32
2014-06-18 20:48:32 -07:00
}
func newPeer(conn *Connection, channels map[string]*Channel) *Peer {
2014-07-01 14:50:24 -07:00
return &Peer{
conn: conn,
channels: channels,
quit: make(chan struct{}),
stopped: 0,
2014-07-01 14:50:24 -07:00
}
2014-06-28 13:09:04 -07:00
}
func (p *Peer) start(pktRecvQueues map[string]chan *InboundPacket, onPeerError func(*Peer, interface{})) {
2014-07-14 16:15:13 -07:00
log.Debug("Starting %v", p)
if atomic.CompareAndSwapUint32(&p.started, 0, 1) {
// on connection error
onError := func(r interface{}) {
p.stop()
2014-07-06 16:05:47 -07:00
onPeerError(p, r)
}
p.conn.Start(p.channels, onError)
for chName, _ := range p.channels {
chInQueue := pktRecvQueues[chName]
go p.recvHandler(chName, chInQueue)
go p.sendHandler(chName)
}
2014-07-01 14:50:24 -07:00
}
2014-06-18 20:48:32 -07:00
}
func (p *Peer) stop() {
2014-07-01 14:50:24 -07:00
if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) {
2014-07-14 16:15:13 -07:00
log.Debug("Stopping %v", p)
2014-07-01 14:50:24 -07:00
close(p.quit)
p.conn.Stop()
}
2014-06-18 20:48:32 -07:00
}
2014-07-10 22:14:23 -07:00
func (p *Peer) IsOutbound() bool {
return p.outbound
2014-07-09 18:33:44 -07:00
}
2014-06-24 17:28:40 -07:00
func (p *Peer) LocalAddress() *NetAddress {
2014-07-01 14:50:24 -07:00
return p.conn.LocalAddress()
2014-06-18 20:48:32 -07:00
}
2014-06-24 17:28:40 -07:00
func (p *Peer) RemoteAddress() *NetAddress {
2014-07-01 14:50:24 -07:00
return p.conn.RemoteAddress()
2014-06-18 20:48:32 -07:00
}
func (p *Peer) Channel(chName string) *Channel {
2014-07-01 14:50:24 -07:00
return p.channels[chName]
2014-06-18 20:48:32 -07:00
}
// TrySend returns true if the packet was successfully queued.
// Returning true does not imply that the packet will be sent.
func (p *Peer) TrySend(pkt Packet) bool {
2014-07-14 16:15:13 -07:00
log.Debug("TrySend [%v] -> %v", pkt, p)
channel := p.Channel(string(pkt.Channel))
sendQueue := channel.sendQueue
2014-07-01 14:50:24 -07:00
if atomic.LoadUint32(&p.stopped) == 1 {
2014-07-01 14:50:24 -07:00
return false
}
2014-07-01 14:50:24 -07:00
select {
case sendQueue <- pkt:
return true
default: // buffer full
return false
}
2014-06-18 20:48:32 -07:00
}
func (p *Peer) Send(pkt Packet) bool {
2014-07-14 16:15:13 -07:00
log.Debug("Send [%v] -> %v", pkt, p)
channel := p.Channel(string(pkt.Channel))
sendQueue := channel.sendQueue
if atomic.LoadUint32(&p.stopped) == 1 {
return false
}
sendQueue <- pkt
return true
}
2014-06-24 17:28:40 -07:00
func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
2014-07-01 14:50:24 -07:00
return p.RemoteAddress().WriteTo(w)
2014-06-18 20:48:32 -07:00
}
2014-06-29 00:35:16 -07:00
func (p *Peer) String() string {
2014-07-10 22:14:23 -07:00
return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outbound)
2014-06-29 00:35:16 -07:00
}
// sendHandler pulls from a channel and pushes to the connection.
// Each channel gets its own sendHandler goroutine;
// Golang's channel implementation handles the scheduling.
func (p *Peer) sendHandler(chName string) {
2014-07-14 16:15:13 -07:00
log.Debug("%v sendHandler [%v]", p, chName)
2014-07-01 14:50:24 -07:00
channel := p.channels[chName]
sendQueue := channel.sendQueue
2014-07-01 14:50:24 -07:00
FOR_LOOP:
for {
select {
case <-p.quit:
break FOR_LOOP
case pkt := <-sendQueue:
2014-07-14 16:15:13 -07:00
log.Debug("Sending packet to peer sendQueue")
// blocks until the connection is Stop'd,
// which happens when this peer is Stop'd.
p.conn.Send(pkt)
2014-07-01 14:50:24 -07:00
}
}
2014-07-14 16:15:13 -07:00
log.Debug("%v sendHandler [%v] closed", p, chName)
2014-07-01 14:50:24 -07:00
// cleanup
// (none)
2014-06-18 20:48:32 -07:00
}
// recvHandler pulls from a channel and pushes to the given pktRecvQueue.
// Each channel gets its own recvHandler goroutine.
// Many peers have goroutines that push to the same pktRecvQueue.
// Golang's channel implementation handles the scheduling.
func (p *Peer) recvHandler(chName string, pktRecvQueue chan<- *InboundPacket) {
2014-07-14 16:15:13 -07:00
log.Debug("%v recvHandler [%v]", p, chName)
channel := p.channels[chName]
recvQueue := channel.recvQueue
2014-07-01 14:50:24 -07:00
FOR_LOOP:
for {
select {
case <-p.quit:
break FOR_LOOP
case pkt := <-recvQueue:
// send to pktRecvQueue
inboundPacket := &InboundPacket{
Peer: p,
Time: Time{time.Now()},
Packet: pkt,
}
select {
case <-p.quit:
break FOR_LOOP
case pktRecvQueue <- inboundPacket:
continue
}
2014-07-01 14:50:24 -07:00
}
}
2014-07-14 16:15:13 -07:00
log.Debug("%v recvHandler [%v] closed", p, chName)
2014-07-01 14:50:24 -07:00
// cleanup
// (none)
2014-06-18 20:48:32 -07:00
}
//-----------------------------------------------------------------------------
/* ChannelDescriptor */
type ChannelDescriptor struct {
Name string
SendBufferSize int
RecvBufferSize int
}
2014-07-05 23:50:06 -07:00
/* Channel */
2014-06-18 20:48:32 -07:00
2014-06-24 17:28:40 -07:00
type Channel struct {
name string
2014-07-01 14:50:24 -07:00
recvQueue chan Packet
sendQueue chan Packet
//stats Stats
2014-06-24 17:28:40 -07:00
}
2014-06-18 20:48:32 -07:00
func newChannel(desc ChannelDescriptor) *Channel {
2014-07-01 14:50:24 -07:00
return &Channel{
name: desc.Name,
recvQueue: make(chan Packet, desc.RecvBufferSize),
sendQueue: make(chan Packet, desc.SendBufferSize),
2014-07-01 14:50:24 -07:00
}
2014-06-18 20:48:32 -07:00
}
func (c *Channel) Name() string {
2014-07-01 14:50:24 -07:00
return c.name
2014-06-25 21:37:20 -07:00
}
func (c *Channel) RecvQueue() <-chan Packet {
2014-07-01 14:50:24 -07:00
return c.recvQueue
2014-06-18 20:48:32 -07:00
}
func (c *Channel) SendQueue() chan<- Packet {
2014-07-01 14:50:24 -07:00
return c.sendQueue
2014-06-18 20:48:32 -07:00
}
//-----------------------------------------------------------------------------
2014-07-05 23:50:06 -07:00
/*
Packet encapsulates a ByteSlice on a Channel.
*/
type Packet struct {
Channel String
Bytes ByteSlice
// Hash
}
func NewPacket(chName String, msg Binary) Packet {
msgBytes := BinaryBytes(msg)
2014-07-14 16:15:13 -07:00
log.Debug("NewPacket msg bytes: %X", msgBytes)
2014-07-05 23:50:06 -07:00
return Packet{
Channel: chName,
Bytes: msgBytes,
2014-07-05 23:50:06 -07:00
}
}
func (p Packet) WriteTo(w io.Writer) (n int64, err error) {
2014-07-06 00:16:05 -07:00
n, err = WriteOnto(p.Channel, w, n, err)
n, err = WriteOnto(p.Bytes, w, n, err)
2014-07-05 23:50:06 -07:00
return
}
func (p Packet) Reader() io.Reader {
return bytes.NewReader(p.Bytes)
}
func (p Packet) String() string {
return fmt.Sprintf("%v:%X", p.Channel, p.Bytes)
}
2014-07-05 23:50:06 -07:00
func ReadPacketSafe(r io.Reader) (pkt Packet, err error) {
chName, err := ReadStringSafe(r)
if err != nil {
return
}
// TODO: packet length sanity check.
bytes, err := ReadByteSliceSafe(r)
if err != nil {
return
}
2014-07-14 16:15:13 -07:00
log.Debug("ReadPacket* msg bytes: %X", bytes)
return Packet{Channel: chName, Bytes: bytes}, nil
2014-07-05 23:50:06 -07:00
}
/*
InboundPacket extends Packet with fields relevant to inbound packets.
2014-07-05 23:50:06 -07:00
*/
type InboundPacket struct {
Peer *Peer
Time Time
Packet
}