tendermint/peer/peer.go

176 lines
3.3 KiB
Go
Raw Normal View History

2014-06-18 20:48:32 -07:00
package peer
import (
2014-07-01 14:50:24 -07:00
"fmt"
"io"
"sync"
"sync/atomic"
"time"
. "github.com/tendermint/tendermint/binary"
2014-06-18 20:48:32 -07:00
)
2014-06-24 17:28:40 -07:00
/* Peer */
2014-06-18 20:48:32 -07:00
2014-06-24 17:28:40 -07:00
type Peer struct {
2014-07-01 14:50:24 -07:00
outgoing bool
conn *Connection
channels map[String]*Channel
2014-06-18 20:48:32 -07:00
2014-07-01 14:50:24 -07:00
mtx sync.Mutex
quit chan struct{}
stopped uint32
2014-06-18 20:48:32 -07:00
}
2014-06-28 13:09:04 -07:00
func NewPeer(conn *Connection) *Peer {
2014-07-01 14:50:24 -07:00
return &Peer{
conn: conn,
quit: make(chan struct{}),
stopped: 0,
}
2014-06-28 13:09:04 -07:00
}
2014-07-01 14:50:24 -07:00
func (p *Peer) Start(peerRecvQueues map[String]chan *InboundPacket) {
log.Debugf("Starting %v", p)
p.conn.Start(p.channels)
for chName, _ := range p.channels {
go p.recvHandler(chName, peerRecvQueues[chName])
go p.sendHandler(chName)
}
2014-06-18 20:48:32 -07:00
}
2014-06-24 17:28:40 -07:00
func (p *Peer) Stop() {
2014-07-01 14:50:24 -07:00
// lock
p.mtx.Lock()
if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) {
log.Debugf("Stopping %v", p)
close(p.quit)
p.conn.Stop()
}
p.mtx.Unlock()
// unlock
2014-06-18 20:48:32 -07:00
}
2014-06-24 17:28:40 -07:00
func (p *Peer) LocalAddress() *NetAddress {
2014-07-01 14:50:24 -07:00
return p.conn.LocalAddress()
2014-06-18 20:48:32 -07:00
}
2014-06-24 17:28:40 -07:00
func (p *Peer) RemoteAddress() *NetAddress {
2014-07-01 14:50:24 -07:00
return p.conn.RemoteAddress()
2014-06-18 20:48:32 -07:00
}
func (p *Peer) Channel(chName String) *Channel {
2014-07-01 14:50:24 -07:00
return p.channels[chName]
2014-06-18 20:48:32 -07:00
}
// If the channel's queue is full, just return false.
// Later the sendHandler will send the pkt to the underlying connection.
func (p *Peer) TrySend(pkt Packet) bool {
2014-07-01 14:50:24 -07:00
channel := p.Channel(pkt.Channel)
sendQueue := channel.SendQueue()
// lock & defer
p.mtx.Lock()
defer p.mtx.Unlock()
if p.stopped == 1 {
return false
}
select {
case sendQueue <- pkt:
return true
default: // buffer full
return false
}
// unlock deferred
2014-06-18 20:48:32 -07:00
}
2014-06-24 17:28:40 -07:00
func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
2014-07-01 14:50:24 -07:00
return p.RemoteAddress().WriteTo(w)
2014-06-18 20:48:32 -07:00
}
2014-06-29 00:35:16 -07:00
func (p *Peer) String() string {
2014-07-01 14:50:24 -07:00
return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing)
2014-06-29 00:35:16 -07:00
}
func (p *Peer) recvHandler(chName String, inboundPacketQueue chan<- *InboundPacket) {
2014-07-01 14:50:24 -07:00
log.Tracef("%v recvHandler [%v]", p, chName)
channel := p.channels[chName]
recvQueue := channel.RecvQueue()
FOR_LOOP:
for {
select {
case <-p.quit:
break FOR_LOOP
case pkt := <-recvQueue:
// send to inboundPacketQueue
inboundPacket := &InboundPacket{
Peer: p,
Channel: channel,
Time: Time{time.Now()},
Packet: pkt,
}
select {
case <-p.quit:
break FOR_LOOP
case inboundPacketQueue <- inboundPacket:
continue
}
}
}
log.Tracef("%v recvHandler [%v] closed", p, chName)
// cleanup
// (none)
2014-06-18 20:48:32 -07:00
}
func (p *Peer) sendHandler(chName String) {
2014-07-01 14:50:24 -07:00
log.Tracef("%v sendHandler [%v]", p, chName)
chSendQueue := p.channels[chName].sendQueue
FOR_LOOP:
for {
select {
case <-p.quit:
break FOR_LOOP
case pkt := <-chSendQueue:
log.Tracef("Sending packet to peer chSendQueue")
// blocks until the connection is Stop'd,
// which happens when this peer is Stop'd.
p.conn.Send(pkt)
}
}
log.Tracef("%v sendHandler [%v] closed", p, chName)
// cleanup
// (none)
2014-06-18 20:48:32 -07:00
}
2014-06-24 17:28:40 -07:00
/* Channel */
2014-06-18 20:48:32 -07:00
2014-06-24 17:28:40 -07:00
type Channel struct {
2014-07-01 14:50:24 -07:00
name String
recvQueue chan Packet
sendQueue chan Packet
//stats Stats
2014-06-24 17:28:40 -07:00
}
2014-06-18 20:48:32 -07:00
func NewChannel(name String, bufferSize int) *Channel {
2014-07-01 14:50:24 -07:00
return &Channel{
name: name,
recvQueue: make(chan Packet, bufferSize),
sendQueue: make(chan Packet, bufferSize),
}
2014-06-18 20:48:32 -07:00
}
func (c *Channel) Name() String {
2014-07-01 14:50:24 -07:00
return c.name
2014-06-25 21:37:20 -07:00
}
func (c *Channel) RecvQueue() <-chan Packet {
2014-07-01 14:50:24 -07:00
return c.recvQueue
2014-06-18 20:48:32 -07:00
}
func (c *Channel) SendQueue() chan<- Packet {
2014-07-01 14:50:24 -07:00
return c.sendQueue
2014-06-18 20:48:32 -07:00
}