tendermint/p2p/connection.go

260 lines
6.1 KiB
Go
Raw Normal View History

2014-07-07 20:03:50 -07:00
package p2p
2014-06-18 20:48:32 -07:00
import (
2014-07-03 13:44:19 -07:00
"bufio"
2014-07-01 14:50:24 -07:00
"fmt"
"net"
"sync/atomic"
"time"
2014-07-14 16:17:42 -07:00
"github.com/op/go-logging"
. "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
2014-06-18 20:48:32 -07:00
)
const (
2014-07-09 14:32:45 -07:00
minReadBufferSize = 1024
minWriteBufferSize = 1024
flushThrottleMS = 50
outQueueSize = 50
idleTimeoutMinutes = 5
pingTimeoutMinutes = 2
2014-06-18 20:48:32 -07:00
)
/*
A Connection wraps a network connection and handles buffering and multiplexing.
"Packets" are sent with ".Send(Packet)".
Packets received are sent to channels as commanded by the ".Start(...)" method.
*/
2014-06-18 20:48:32 -07:00
type Connection struct {
2014-07-01 14:50:24 -07:00
ioStats IOStats
sendQueue chan Packet // never closes
conn net.Conn
bufReader *bufio.Reader
bufWriter *bufio.Writer
flushThrottler *Throttler
quit chan struct{}
pingRepeatTimer *RepeatTimer
pong chan struct{}
channels map[string]*Channel
onError func(interface{})
started uint32
stopped uint32
errored uint32
2014-06-18 20:48:32 -07:00
}
2014-07-09 14:27:32 -07:00
const (
2014-07-09 14:32:45 -07:00
packetTypePing = UInt8(0x00)
packetTypePong = UInt8(0x01)
packetTypeMessage = UInt8(0x10)
2014-06-18 20:48:32 -07:00
)
func NewConnection(conn net.Conn) *Connection {
2014-07-01 14:50:24 -07:00
return &Connection{
2014-07-09 14:32:45 -07:00
sendQueue: make(chan Packet, outQueueSize),
conn: conn,
2014-07-09 14:32:45 -07:00
bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
flushThrottler: NewThrottler(flushThrottleMS * time.Millisecond),
quit: make(chan struct{}),
2014-07-09 14:32:45 -07:00
pingRepeatTimer: NewRepeatTimer(pingTimeoutMinutes * time.Minute),
pong: make(chan struct{}),
2014-07-01 14:50:24 -07:00
}
2014-06-18 20:48:32 -07:00
}
// .Start() begins multiplexing packets to and from "channels".
// If an error occurs, the recovered reason is passed to "onError".
func (c *Connection) Start(channels map[string]*Channel, onError func(interface{})) {
if atomic.CompareAndSwapUint32(&c.started, 0, 1) {
2014-07-14 16:15:13 -07:00
log.Debug("Starting %v", c)
c.channels = channels
c.onError = onError
go c.sendHandler()
go c.recvHandler()
}
2014-06-18 20:48:32 -07:00
}
2014-06-24 17:28:40 -07:00
func (c *Connection) Stop() {
2014-07-01 14:50:24 -07:00
if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
2014-07-14 16:15:13 -07:00
log.Debug("Stopping %v", c)
2014-07-01 14:50:24 -07:00
close(c.quit)
c.conn.Close()
c.flushThrottler.Stop()
c.pingRepeatTimer.Stop()
2014-07-01 14:50:24 -07:00
// We can't close pong safely here because
// recvHandler may write to it after we've stopped.
// Though it doesn't need to get closed at all,
// we close it @ recvHandler.
// close(c.pong)
}
2014-06-24 17:28:40 -07:00
}
func (c *Connection) LocalAddress() *NetAddress {
2014-07-01 14:50:24 -07:00
return NewNetAddress(c.conn.LocalAddr())
2014-06-24 17:28:40 -07:00
}
func (c *Connection) RemoteAddress() *NetAddress {
2014-07-01 14:50:24 -07:00
return NewNetAddress(c.conn.RemoteAddr())
2014-06-18 20:48:32 -07:00
}
// Returns true if successfully queued,
// Returns false if connection was closed.
// Blocks.
func (c *Connection) Send(pkt Packet) bool {
select {
case c.sendQueue <- pkt:
return true
case <-c.quit:
return false
}
}
2014-06-29 00:35:16 -07:00
func (c *Connection) String() string {
2014-07-01 14:50:24 -07:00
return fmt.Sprintf("Connection{%v}", c.conn.RemoteAddr())
2014-06-29 00:35:16 -07:00
}
2014-06-18 20:48:32 -07:00
func (c *Connection) flush() {
// TODO: this is pretty naive.
// We end up flushing when we don't have to (yet).
// A better solution might require us implementing our own buffered writer.
err := c.bufWriter.Flush()
if err != nil {
if atomic.LoadUint32(&c.stopped) != 1 {
2014-07-14 16:15:13 -07:00
log.Warning("Connection flush failed: %v", err)
}
}
2014-06-18 20:48:32 -07:00
}
// Catch panics, usually caused by remote disconnects.
func (c *Connection) _recover() {
if r := recover(); r != nil {
c.Stop()
if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
if c.onError != nil {
c.onError(r)
}
}
}
}
// sendHandler pulls from .sendQueue and writes to .bufWriter
func (c *Connection) sendHandler() {
2014-07-14 16:15:13 -07:00
log.Debug("%v sendHandler", c)
defer c._recover()
2014-07-01 14:50:24 -07:00
FOR_LOOP:
for {
var err error
select {
case sendPkt := <-c.sendQueue:
2014-07-14 16:15:13 -07:00
log.Debug("Found pkt from sendQueue. Writing pkt to underlying connection")
2014-07-09 14:32:45 -07:00
_, err = packetTypeMessage.WriteTo(c.bufWriter)
2014-07-01 14:50:24 -07:00
if err != nil {
break
}
2014-07-03 13:44:19 -07:00
_, err = sendPkt.WriteTo(c.bufWriter)
c.flushThrottler.Set()
case <-c.flushThrottler.Ch:
c.flush()
case <-c.pingRepeatTimer.Ch:
2014-07-09 14:32:45 -07:00
_, err = packetTypePing.WriteTo(c.bufWriter)
2014-07-14 16:15:13 -07:00
log.Debug("Send [Ping] -> %v", c)
c.flush()
2014-07-01 14:50:24 -07:00
case <-c.pong:
2014-07-09 14:32:45 -07:00
_, err = packetTypePong.WriteTo(c.bufWriter)
2014-07-14 16:15:13 -07:00
log.Debug("Send [Pong] -> %v", c)
c.flush()
2014-07-01 14:50:24 -07:00
case <-c.quit:
break FOR_LOOP
}
if atomic.LoadUint32(&c.stopped) == 1 {
break FOR_LOOP
}
2014-07-01 14:50:24 -07:00
if err != nil {
2014-07-14 16:15:13 -07:00
log.Info("%v failed @ sendHandler:\n%v", c, err)
2014-07-01 14:50:24 -07:00
c.Stop()
break FOR_LOOP
}
}
2014-07-14 16:15:13 -07:00
log.Debug("%v sendHandler done", c)
2014-07-01 14:50:24 -07:00
// cleanup
2014-06-18 20:48:32 -07:00
}
// recvHandler reads from .bufReader and pushes to the appropriate
// channel's recvQueue.
func (c *Connection) recvHandler() {
2014-07-14 16:15:13 -07:00
log.Debug("%v recvHandler", c)
defer c._recover()
2014-07-01 14:50:24 -07:00
FOR_LOOP:
for {
2014-07-15 15:54:33 -07:00
pktType, err := ReadUInt8Safe(c.bufReader)
2014-07-14 16:17:42 -07:00
if log.IsEnabledFor(logging.DEBUG) {
2014-07-14 16:15:13 -07:00
// peeking into bufReader
numBytes := c.bufReader.Buffered()
bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
2014-07-15 15:54:33 -07:00
if err != nil {
log.Debug("recvHandler packet type %X, peeked: %X", pktType, bytes)
}
2014-07-14 16:15:13 -07:00
}
2014-07-01 14:50:24 -07:00
if err != nil {
if atomic.LoadUint32(&c.stopped) != 1 {
2014-07-15 15:54:33 -07:00
log.Info("%v failed @ recvHandler with err: %v", c, err)
2014-07-01 14:50:24 -07:00
c.Stop()
}
break FOR_LOOP
} else {
2014-07-14 16:15:13 -07:00
log.Debug("Found pktType %v", pktType)
2014-07-01 14:50:24 -07:00
}
switch pktType {
2014-07-09 14:32:45 -07:00
case packetTypePing:
// TODO: keep track of these, make sure it isn't abused
// as they cause flush()'s in the send buffer.
2014-07-01 14:50:24 -07:00
c.pong <- struct{}{}
2014-07-09 14:32:45 -07:00
case packetTypePong:
2014-07-01 14:50:24 -07:00
// do nothing
2014-07-14 16:15:13 -07:00
log.Debug("[%v] Received Pong", c)
2014-07-09 14:32:45 -07:00
case packetTypeMessage:
2014-07-03 13:44:19 -07:00
pkt, err := ReadPacketSafe(c.bufReader)
2014-07-01 14:50:24 -07:00
if err != nil {
if atomic.LoadUint32(&c.stopped) != 1 {
2014-07-14 16:15:13 -07:00
log.Info("%v failed @ recvHandler", c)
2014-07-01 14:50:24 -07:00
c.Stop()
}
break FOR_LOOP
}
channel := c.channels[string(pkt.Channel)]
2014-07-01 14:50:24 -07:00
if channel == nil {
Panicf("Unknown channel %v", pkt.Channel)
}
channel.recvQueue <- pkt
default:
Panicf("Unknown message type %v", pktType)
}
c.pingRepeatTimer.Reset()
2014-07-01 14:50:24 -07:00
}
2014-07-14 16:15:13 -07:00
log.Debug("%v recvHandler done", c)
2014-07-01 14:50:24 -07:00
// cleanup
close(c.pong)
for _ = range c.pong {
// drain
}
2014-06-18 20:48:32 -07:00
}
/* IOStats */
type IOStats struct {
2014-07-01 14:50:24 -07:00
TimeConnected Time
LastSent Time
LastRecv Time
BytesRecv UInt64
BytesSent UInt64
PktsRecv UInt64
PktsSent UInt64
2014-06-18 20:48:32 -07:00
}