commit
e6b7e66bbe
|
@ -17,7 +17,6 @@ const (
|
|||
|
||||
// Fuzz params
|
||||
configFuzzEnable = "fuzz_enable" // use the fuzz wrapped conn
|
||||
configFuzzActive = "fuzz_active" // toggle fuzzing
|
||||
configFuzzMode = "fuzz_mode" // eg. drop, delay
|
||||
configFuzzMaxDelayMilliseconds = "fuzz_max_delay_milliseconds"
|
||||
configFuzzProbDropRW = "fuzz_prob_drop_rw"
|
||||
|
@ -38,7 +37,6 @@ func setConfigDefaults(config cfg.Config) {
|
|||
|
||||
// Fuzz defaults
|
||||
config.SetDefault(configFuzzEnable, false)
|
||||
config.SetDefault(configFuzzActive, false)
|
||||
config.SetDefault(configFuzzMode, FuzzModeDrop)
|
||||
config.SetDefault(configFuzzMaxDelayMilliseconds, 3000)
|
||||
config.SetDefault(configFuzzProbDropRW, 0.2)
|
||||
|
|
|
@ -74,7 +74,7 @@ type MConnection struct {
|
|||
onReceive receiveCbFunc
|
||||
onError errorCbFunc
|
||||
errored uint32
|
||||
config *MConnectionConfig
|
||||
config *MConnConfig
|
||||
|
||||
quit chan struct{}
|
||||
flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
|
||||
|
@ -85,12 +85,20 @@ type MConnection struct {
|
|||
RemoteAddress *NetAddress
|
||||
}
|
||||
|
||||
// MConnectionConfig is a MConnection configuration
|
||||
type MConnectionConfig struct {
|
||||
// MConnConfig is a MConnection configuration.
|
||||
type MConnConfig struct {
|
||||
SendRate int64
|
||||
RecvRate int64
|
||||
}
|
||||
|
||||
// DefaultMConnConfig returns the default config.
|
||||
func DefaultMConnConfig() *MConnConfig {
|
||||
return &MConnConfig{
|
||||
SendRate: defaultSendRate,
|
||||
RecvRate: defaultRecvRate,
|
||||
}
|
||||
}
|
||||
|
||||
// NewMConnection wraps net.Conn and creates multiplex connection
|
||||
func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
|
||||
return NewMConnectionWithConfig(
|
||||
|
@ -98,14 +106,11 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei
|
|||
chDescs,
|
||||
onReceive,
|
||||
onError,
|
||||
&MConnectionConfig{
|
||||
SendRate: defaultSendRate,
|
||||
RecvRate: defaultRecvRate,
|
||||
})
|
||||
DefaultMConnConfig())
|
||||
}
|
||||
|
||||
// NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
|
||||
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnectionConfig) *MConnection {
|
||||
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
|
||||
mconn := &MConnection{
|
||||
conn: conn,
|
||||
bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
|
||||
|
@ -253,6 +258,8 @@ func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
|
|||
return ok
|
||||
}
|
||||
|
||||
// CanSend returns true if you can send more data onto the chID, false
|
||||
// otherwise. Use only as a heuristic.
|
||||
func (c *MConnection) CanSend(chID byte) bool {
|
||||
if !c.IsRunning() {
|
||||
return false
|
||||
|
@ -552,14 +559,12 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
|
|||
// Goroutine-safe
|
||||
// Times out (and returns false) after defaultSendTimeout
|
||||
func (ch *Channel) sendBytes(bytes []byte) bool {
|
||||
timeout := time.NewTimer(defaultSendTimeout)
|
||||
select {
|
||||
case <-timeout.C:
|
||||
// timeout
|
||||
return false
|
||||
case ch.sendQueue <- bytes:
|
||||
atomic.AddInt32(&ch.sendQueueSize, 1)
|
||||
return true
|
||||
case <-time.After(defaultSendTimeout):
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ func createMConnection(conn net.Conn) *p2p.MConnection {
|
|||
}
|
||||
|
||||
func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection {
|
||||
chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1}}
|
||||
chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
|
||||
return p2p.NewMConnection(conn, chDescs, onReceive, onError)
|
||||
}
|
||||
|
||||
|
@ -37,13 +37,18 @@ func TestMConnectionSend(t *testing.T) {
|
|||
|
||||
msg := "Ant-Man"
|
||||
assert.True(mconn.Send(0x01, msg))
|
||||
assert.False(mconn.CanSend(0x01))
|
||||
// Note: subsequent Send/TrySend calls could pass because we are reading from
|
||||
// the send queue in a separate goroutine.
|
||||
assert.False(mconn.CanSend(0x01), "CanSend should return false because queue is full")
|
||||
server.Read(make([]byte, len(msg)))
|
||||
assert.True(mconn.CanSend(0x01))
|
||||
|
||||
msg = "Spider-Man"
|
||||
assert.True(mconn.TrySend(0x01, msg))
|
||||
server.Read(make([]byte, len(msg)))
|
||||
|
||||
assert.False(mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
|
||||
assert.False(mconn.Send(0x05, "Absorbing Man"), "Send should return false because channel is unknown")
|
||||
}
|
||||
|
||||
func TestMConnectionReceive(t *testing.T) {
|
||||
|
|
204
fuzz.go
204
fuzz.go
|
@ -5,86 +5,147 @@ import (
|
|||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cfg "github.com/tendermint/go-config"
|
||||
)
|
||||
|
||||
//--------------------------------------------------------
|
||||
// delay reads/writes
|
||||
// randomly drop reads/writes
|
||||
// randomly drop connections
|
||||
|
||||
const (
|
||||
FuzzModeDrop = "drop"
|
||||
FuzzModeDelay = "delay"
|
||||
// FuzzModeDrop is a mode in which we randomly drop reads/writes, connections or sleep
|
||||
FuzzModeDrop = iota
|
||||
// FuzzModeDelay is a mode in which we randomly sleep
|
||||
FuzzModeDelay
|
||||
)
|
||||
|
||||
func FuzzConn(config cfg.Config, conn net.Conn) net.Conn {
|
||||
return &FuzzedConnection{
|
||||
conn: conn,
|
||||
start: time.After(time.Second * 10), // so we have time to do peer handshakes and get set up
|
||||
params: config,
|
||||
}
|
||||
}
|
||||
|
||||
// FuzzedConnection wraps any net.Conn and depending on the mode either delays
|
||||
// reads/writes or randomly drops reads/writes/connections.
|
||||
type FuzzedConnection struct {
|
||||
conn net.Conn
|
||||
|
||||
mtx sync.Mutex
|
||||
fuzz bool // we don't start fuzzing right away
|
||||
start <-chan time.Time
|
||||
mtx sync.Mutex
|
||||
start <-chan time.Time
|
||||
active bool
|
||||
|
||||
// fuzz params
|
||||
params cfg.Config
|
||||
config *FuzzConnConfig
|
||||
}
|
||||
|
||||
// FuzzConnConfig is a FuzzedConnection configuration.
|
||||
type FuzzConnConfig struct {
|
||||
Mode int
|
||||
MaxDelay time.Duration
|
||||
ProbDropRW float64
|
||||
ProbDropConn float64
|
||||
ProbSleep float64
|
||||
}
|
||||
|
||||
// DefaultFuzzConnConfig returns the default config.
|
||||
func DefaultFuzzConnConfig() *FuzzConnConfig {
|
||||
return &FuzzConnConfig{
|
||||
Mode: FuzzModeDrop,
|
||||
MaxDelay: 3 * time.Second,
|
||||
ProbDropRW: 0.2,
|
||||
ProbDropConn: 0.00,
|
||||
ProbSleep: 0.00,
|
||||
}
|
||||
}
|
||||
|
||||
// FuzzConn creates a new FuzzedConnection. Fuzzing starts immediately.
|
||||
func FuzzConn(conn net.Conn) net.Conn {
|
||||
return FuzzConnFromConfig(conn, DefaultFuzzConnConfig())
|
||||
}
|
||||
|
||||
// FuzzConnFromConfig creates a new FuzzedConnection from a config. Fuzzing
|
||||
// starts immediately.
|
||||
func FuzzConnFromConfig(conn net.Conn, config *FuzzConnConfig) net.Conn {
|
||||
return &FuzzedConnection{
|
||||
conn: conn,
|
||||
start: make(<-chan time.Time),
|
||||
active: true,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
// FuzzConnAfter creates a new FuzzedConnection. Fuzzing starts when the
|
||||
// duration elapses.
|
||||
func FuzzConnAfter(conn net.Conn, d time.Duration) net.Conn {
|
||||
return FuzzConnAfterFromConfig(conn, d, DefaultFuzzConnConfig())
|
||||
}
|
||||
|
||||
// FuzzConnAfterFromConfig creates a new FuzzedConnection from a config.
|
||||
// Fuzzing starts when the duration elapses.
|
||||
func FuzzConnAfterFromConfig(conn net.Conn, d time.Duration, config *FuzzConnConfig) net.Conn {
|
||||
return &FuzzedConnection{
|
||||
conn: conn,
|
||||
start: time.After(d),
|
||||
active: false,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
// Config returns the connection's config.
|
||||
func (fc *FuzzedConnection) Config() *FuzzConnConfig {
|
||||
return fc.config
|
||||
}
|
||||
|
||||
// Read implements net.Conn.
|
||||
func (fc *FuzzedConnection) Read(data []byte) (n int, err error) {
|
||||
if fc.fuzz() {
|
||||
return 0, nil
|
||||
}
|
||||
return fc.conn.Read(data)
|
||||
}
|
||||
|
||||
// Write implements net.Conn.
|
||||
func (fc *FuzzedConnection) Write(data []byte) (n int, err error) {
|
||||
if fc.fuzz() {
|
||||
return 0, nil
|
||||
}
|
||||
return fc.conn.Write(data)
|
||||
}
|
||||
|
||||
// Close implements net.Conn.
|
||||
func (fc *FuzzedConnection) Close() error { return fc.conn.Close() }
|
||||
|
||||
// LocalAddr implements net.Conn.
|
||||
func (fc *FuzzedConnection) LocalAddr() net.Addr { return fc.conn.LocalAddr() }
|
||||
|
||||
// RemoteAddr implements net.Conn.
|
||||
func (fc *FuzzedConnection) RemoteAddr() net.Addr { return fc.conn.RemoteAddr() }
|
||||
|
||||
// SetDeadline implements net.Conn.
|
||||
func (fc *FuzzedConnection) SetDeadline(t time.Time) error { return fc.conn.SetDeadline(t) }
|
||||
|
||||
// SetReadDeadline implements net.Conn.
|
||||
func (fc *FuzzedConnection) SetReadDeadline(t time.Time) error {
|
||||
return fc.conn.SetReadDeadline(t)
|
||||
}
|
||||
|
||||
// SetWriteDeadline implements net.Conn.
|
||||
func (fc *FuzzedConnection) SetWriteDeadline(t time.Time) error {
|
||||
return fc.conn.SetWriteDeadline(t)
|
||||
}
|
||||
|
||||
func (fc *FuzzedConnection) randomDuration() time.Duration {
|
||||
return time.Millisecond * time.Duration(rand.Int()%fc.MaxDelayMilliseconds())
|
||||
}
|
||||
|
||||
func (fc *FuzzedConnection) Active() bool {
|
||||
return fc.params.GetBool(configFuzzActive)
|
||||
}
|
||||
|
||||
func (fc *FuzzedConnection) Mode() string {
|
||||
return fc.params.GetString(configFuzzMode)
|
||||
}
|
||||
|
||||
func (fc *FuzzedConnection) ProbDropRW() float64 {
|
||||
return fc.params.GetFloat64(configFuzzProbDropRW)
|
||||
}
|
||||
|
||||
func (fc *FuzzedConnection) ProbDropConn() float64 {
|
||||
return fc.params.GetFloat64(configFuzzProbDropConn)
|
||||
}
|
||||
|
||||
func (fc *FuzzedConnection) ProbSleep() float64 {
|
||||
return fc.params.GetFloat64(configFuzzProbSleep)
|
||||
}
|
||||
|
||||
func (fc *FuzzedConnection) MaxDelayMilliseconds() int {
|
||||
return fc.params.GetInt(configFuzzMaxDelayMilliseconds)
|
||||
maxDelayMillis := int(fc.config.MaxDelay.Nanoseconds() / 1000)
|
||||
return time.Millisecond * time.Duration(rand.Int()%maxDelayMillis)
|
||||
}
|
||||
|
||||
// implements the fuzz (delay, kill conn)
|
||||
// and returns whether or not the read/write should be ignored
|
||||
func (fc *FuzzedConnection) Fuzz() bool {
|
||||
func (fc *FuzzedConnection) fuzz() bool {
|
||||
if !fc.shouldFuzz() {
|
||||
return false
|
||||
}
|
||||
|
||||
switch fc.Mode() {
|
||||
switch fc.config.Mode {
|
||||
case FuzzModeDrop:
|
||||
// randomly drop the r/w, drop the conn, or sleep
|
||||
r := rand.Float64()
|
||||
if r <= fc.ProbDropRW() {
|
||||
if r <= fc.config.ProbDropRW {
|
||||
return true
|
||||
} else if r < fc.ProbDropRW()+fc.ProbDropConn() {
|
||||
} else if r < fc.config.ProbDropRW+fc.config.ProbDropConn {
|
||||
// XXX: can't this fail because machine precision?
|
||||
// XXX: do we need an error?
|
||||
fc.Close()
|
||||
return true
|
||||
} else if r < fc.ProbDropRW()+fc.ProbDropConn()+fc.ProbSleep() {
|
||||
} else if r < fc.config.ProbDropRW+fc.config.ProbDropConn+fc.config.ProbSleep {
|
||||
time.Sleep(fc.randomDuration())
|
||||
}
|
||||
case FuzzModeDelay:
|
||||
|
@ -94,48 +155,19 @@ func (fc *FuzzedConnection) Fuzz() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// we don't fuzz until start chan fires
|
||||
func (fc *FuzzedConnection) shouldFuzz() bool {
|
||||
if !fc.Active() {
|
||||
return false
|
||||
if fc.active {
|
||||
return true
|
||||
}
|
||||
|
||||
fc.mtx.Lock()
|
||||
defer fc.mtx.Unlock()
|
||||
if fc.fuzz {
|
||||
return true
|
||||
}
|
||||
|
||||
select {
|
||||
case <-fc.start:
|
||||
fc.fuzz = true
|
||||
fc.active = true
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (fc *FuzzedConnection) Read(data []byte) (n int, err error) {
|
||||
if fc.Fuzz() {
|
||||
return 0, nil
|
||||
}
|
||||
return fc.conn.Read(data)
|
||||
}
|
||||
|
||||
func (fc *FuzzedConnection) Write(data []byte) (n int, err error) {
|
||||
if fc.Fuzz() {
|
||||
return 0, nil
|
||||
}
|
||||
return fc.conn.Write(data)
|
||||
}
|
||||
|
||||
// Implements net.Conn
|
||||
func (fc *FuzzedConnection) Close() error { return fc.conn.Close() }
|
||||
func (fc *FuzzedConnection) LocalAddr() net.Addr { return fc.conn.LocalAddr() }
|
||||
func (fc *FuzzedConnection) RemoteAddr() net.Addr { return fc.conn.RemoteAddr() }
|
||||
func (fc *FuzzedConnection) SetDeadline(t time.Time) error { return fc.conn.SetDeadline(t) }
|
||||
func (fc *FuzzedConnection) SetReadDeadline(t time.Time) error {
|
||||
return fc.conn.SetReadDeadline(t)
|
||||
}
|
||||
func (fc *FuzzedConnection) SetWriteDeadline(t time.Time) error {
|
||||
return fc.conn.SetWriteDeadline(t)
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
hash: 92a49cbcf88a339e4d29559fe291c30e61eacda1020fd04dfcd97de834e18b3e
|
||||
updated: 2017-04-10T11:17:14.66226896Z
|
||||
hash: ef8ea7b02d9a133bfbfcf3f4615d43be0956ad2bc9eb0050e0721fca12d09308
|
||||
updated: 2017-04-14T08:28:07.579629532Z
|
||||
imports:
|
||||
- name: github.com/btcsuite/btcd
|
||||
version: 4b348c1d33373d672edd83fc576892d0e46686d2
|
||||
|
@ -25,9 +25,9 @@ imports:
|
|||
- name: github.com/tendermint/go-config
|
||||
version: 620dcbbd7d587cf3599dedbf329b64311b0c307a
|
||||
- name: github.com/tendermint/go-crypto
|
||||
version: 3f47cfac5fcd9e0f1727c7db980b3559913b3e3a
|
||||
version: 750b25c47a5782f5f2b773ed9e706cb82b3ccef4
|
||||
- name: github.com/tendermint/go-data
|
||||
version: c955b191240568440ea902e14dad2ce19727543a
|
||||
version: e7fcc6d081ec8518912fcdc103188275f83a3ee5
|
||||
- name: github.com/tendermint/go-flowrate
|
||||
version: a20c98e61957faa93b4014fbd902f20ab9317a6a
|
||||
subpackages:
|
||||
|
@ -41,7 +41,7 @@ imports:
|
|||
subpackages:
|
||||
- term
|
||||
- name: golang.org/x/crypto
|
||||
version: 9ef620b9ca2f82b55030ffd4f41327fa9e77a92c
|
||||
version: cbc3d0884eac986df6e78a039b8792e869bff863
|
||||
subpackages:
|
||||
- curve25519
|
||||
- nacl/box
|
||||
|
|
|
@ -3,8 +3,9 @@ import:
|
|||
- package: github.com/tendermint/go-common
|
||||
- package: github.com/tendermint/go-config
|
||||
- package: github.com/tendermint/go-crypto
|
||||
version: develop
|
||||
- package: github.com/tendermint/go-data
|
||||
version: c955b191240568440ea902e14dad2ce19727543a
|
||||
version: develop
|
||||
- package: github.com/tendermint/go-flowrate
|
||||
subpackages:
|
||||
- flowrate
|
||||
|
@ -16,6 +17,7 @@ import:
|
|||
- nacl/box
|
||||
- nacl/secretbox
|
||||
- ripemd160
|
||||
- package: github.com/pkg/errors
|
||||
testImport:
|
||||
- package: github.com/stretchr/testify
|
||||
subpackages:
|
||||
|
|
|
@ -6,6 +6,7 @@ package p2p
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"net"
|
||||
"strconv"
|
||||
"time"
|
||||
|
@ -13,28 +14,36 @@ import (
|
|||
cmn "github.com/tendermint/go-common"
|
||||
)
|
||||
|
||||
// NetAddress defines information about a peer on the network
|
||||
// including its IP address, and port.
|
||||
type NetAddress struct {
|
||||
IP net.IP
|
||||
Port uint16
|
||||
str string
|
||||
}
|
||||
|
||||
// NewNetAddress returns a new NetAddress using the provided TCP
|
||||
// address. When testing, other net.Addr (except TCP) will result in
|
||||
// using 0.0.0.0:0. When normal run, other net.Addr (except TCP) will
|
||||
// panic.
|
||||
// TODO: socks proxies?
|
||||
func NewNetAddress(addr net.Addr) *NetAddress {
|
||||
tcpAddr, ok := addr.(*net.TCPAddr)
|
||||
if !ok {
|
||||
log.Warn(`Only TCPAddrs are supported. If used for anything but testing,
|
||||
may result in undefined behaviour!`, "addr", addr)
|
||||
return NewNetAddressIPPort(net.IP("0.0.0.0"), 0)
|
||||
// NOTE: it would be nice to only not panic if we're in testing ...
|
||||
// PanicSanity(Fmt("Only TCPAddrs are supported. Got: %v", addr))
|
||||
if flag.Lookup("test.v") == nil { // normal run
|
||||
cmn.PanicSanity(cmn.Fmt("Only TCPAddrs are supported. Got: %v", addr))
|
||||
} else { // in testing
|
||||
return NewNetAddressIPPort(net.IP("0.0.0.0"), 0)
|
||||
}
|
||||
}
|
||||
ip := tcpAddr.IP
|
||||
port := uint16(tcpAddr.Port)
|
||||
return NewNetAddressIPPort(ip, port)
|
||||
}
|
||||
|
||||
// Also resolves the host if host is not an IP.
|
||||
// NewNetAddressString returns a new NetAddress using the provided
|
||||
// address in the form of "IP:Port". Also resolves the host if host
|
||||
// is not an IP.
|
||||
func NewNetAddressString(addr string) (*NetAddress, error) {
|
||||
|
||||
host, portStr, err := net.SplitHostPort(addr)
|
||||
|
@ -62,6 +71,8 @@ func NewNetAddressString(addr string) (*NetAddress, error) {
|
|||
return na, nil
|
||||
}
|
||||
|
||||
// NewNetAddressStrings returns an array of NetAddress'es build using
|
||||
// the provided strings.
|
||||
func NewNetAddressStrings(addrs []string) ([]*NetAddress, error) {
|
||||
netAddrs := make([]*NetAddress, len(addrs))
|
||||
for i, addr := range addrs {
|
||||
|
@ -74,6 +85,8 @@ func NewNetAddressStrings(addrs []string) ([]*NetAddress, error) {
|
|||
return netAddrs, nil
|
||||
}
|
||||
|
||||
// NewNetAddressIPPort returns a new NetAddress using the provided IP
|
||||
// and port number.
|
||||
func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress {
|
||||
na := &NetAddress{
|
||||
IP: ip,
|
||||
|
@ -86,23 +99,25 @@ func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress {
|
|||
return na
|
||||
}
|
||||
|
||||
// Equals reports whether na and other are the same addresses.
|
||||
func (na *NetAddress) Equals(other interface{}) bool {
|
||||
if o, ok := other.(*NetAddress); ok {
|
||||
return na.String() == o.String()
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (na *NetAddress) Less(other interface{}) bool {
|
||||
if o, ok := other.(*NetAddress); ok {
|
||||
return na.String() < o.String()
|
||||
} else {
|
||||
cmn.PanicSanity("Cannot compare unequal types")
|
||||
return false
|
||||
}
|
||||
|
||||
cmn.PanicSanity("Cannot compare unequal types")
|
||||
return false
|
||||
}
|
||||
|
||||
// String representation.
|
||||
func (na *NetAddress) String() string {
|
||||
if na.str == "" {
|
||||
na.str = net.JoinHostPort(
|
||||
|
@ -113,6 +128,7 @@ func (na *NetAddress) String() string {
|
|||
return na.str
|
||||
}
|
||||
|
||||
// Dial calls net.Dial on the address.
|
||||
func (na *NetAddress) Dial() (net.Conn, error) {
|
||||
conn, err := net.Dial("tcp", na.String())
|
||||
if err != nil {
|
||||
|
@ -121,6 +137,7 @@ func (na *NetAddress) Dial() (net.Conn, error) {
|
|||
return conn, nil
|
||||
}
|
||||
|
||||
// DialTimeout calls net.DialTimeout on the address.
|
||||
func (na *NetAddress) DialTimeout(timeout time.Duration) (net.Conn, error) {
|
||||
conn, err := net.DialTimeout("tcp", na.String(), timeout)
|
||||
if err != nil {
|
||||
|
@ -129,6 +146,7 @@ func (na *NetAddress) DialTimeout(timeout time.Duration) (net.Conn, error) {
|
|||
return conn, nil
|
||||
}
|
||||
|
||||
// Routable returns true if the address is routable.
|
||||
func (na *NetAddress) Routable() bool {
|
||||
// TODO(oga) bitcoind doesn't include RFC3849 here, but should we?
|
||||
return na.Valid() && !(na.RFC1918() || na.RFC3927() || na.RFC4862() ||
|
||||
|
@ -142,10 +160,12 @@ func (na *NetAddress) Valid() bool {
|
|||
na.IP.Equal(net.IPv4bcast))
|
||||
}
|
||||
|
||||
// Local returns true if it is a local address.
|
||||
func (na *NetAddress) Local() bool {
|
||||
return na.IP.IsLoopback() || zero4.Contains(na.IP)
|
||||
}
|
||||
|
||||
// ReachabilityTo checks whenever o can be reached from na.
|
||||
func (na *NetAddress) ReachabilityTo(o *NetAddress) int {
|
||||
const (
|
||||
Unreachable = 0
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
package p2p
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewNetAddress(t *testing.T) {
|
||||
assert, require := assert.New(t), require.New(t)
|
||||
|
||||
tcpAddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:8080")
|
||||
require.Nil(err)
|
||||
addr := NewNetAddress(tcpAddr)
|
||||
|
||||
assert.Equal("127.0.0.1:8080", addr.String())
|
||||
|
||||
assert.NotPanics(func() {
|
||||
NewNetAddress(&net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8000})
|
||||
}, "Calling NewNetAddress with UDPAddr should not panic in testing")
|
||||
}
|
||||
|
||||
func TestNewNetAddressString(t *testing.T) {
|
||||
assert, require := assert.New(t), require.New(t)
|
||||
|
||||
tests := []struct {
|
||||
addr string
|
||||
correct bool
|
||||
}{
|
||||
{"127.0.0.1:8080", true},
|
||||
{"127.0.0:8080", false},
|
||||
{"a", false},
|
||||
{"127.0.0.1:a", false},
|
||||
{"a:8080", false},
|
||||
{"8082", false},
|
||||
{"127.0.0:8080000", false},
|
||||
}
|
||||
|
||||
for _, t := range tests {
|
||||
addr, err := NewNetAddressString(t.addr)
|
||||
if t.correct {
|
||||
require.Nil(err)
|
||||
assert.Equal(t.addr, addr.String())
|
||||
} else {
|
||||
require.NotNil(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewNetAddressStrings(t *testing.T) {
|
||||
assert, require := assert.New(t), require.New(t)
|
||||
addrs, err := NewNetAddressStrings([]string{"127.0.0.1:8080", "127.0.0.2:8080"})
|
||||
require.Nil(err)
|
||||
|
||||
assert.Equal(2, len(addrs))
|
||||
}
|
||||
|
||||
func TestNewNetAddressIPPort(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
addr := NewNetAddressIPPort(net.ParseIP("127.0.0.1"), 8080)
|
||||
|
||||
assert.Equal("127.0.0.1:8080", addr.String())
|
||||
}
|
||||
|
||||
func TestNetAddressProperties(t *testing.T) {
|
||||
assert, require := assert.New(t), require.New(t)
|
||||
|
||||
// TODO add more test cases
|
||||
tests := []struct {
|
||||
addr string
|
||||
valid bool
|
||||
local bool
|
||||
routable bool
|
||||
}{
|
||||
{"127.0.0.1:8080", true, true, false},
|
||||
{"ya.ru:80", true, false, true},
|
||||
}
|
||||
|
||||
for _, t := range tests {
|
||||
addr, err := NewNetAddressString(t.addr)
|
||||
require.Nil(err)
|
||||
|
||||
assert.Equal(t.valid, addr.Valid())
|
||||
assert.Equal(t.local, addr.Local())
|
||||
assert.Equal(t.routable, addr.Routable())
|
||||
}
|
||||
}
|
||||
|
||||
func TestNetAddressReachabilityTo(t *testing.T) {
|
||||
assert, require := assert.New(t), require.New(t)
|
||||
|
||||
// TODO add more test cases
|
||||
tests := []struct {
|
||||
addr string
|
||||
other string
|
||||
reachability int
|
||||
}{
|
||||
{"127.0.0.1:8080", "127.0.0.1:8081", 0},
|
||||
{"ya.ru:80", "127.0.0.1:8080", 1},
|
||||
}
|
||||
|
||||
for _, t := range tests {
|
||||
addr, err := NewNetAddressString(t.addr)
|
||||
require.Nil(err)
|
||||
|
||||
other, err := NewNetAddressString(t.other)
|
||||
require.Nil(err)
|
||||
|
||||
assert.Equal(t.reachability, addr.ReachabilityTo(other))
|
||||
}
|
||||
}
|
108
peer.go
108
peer.go
|
@ -6,8 +6,8 @@ import (
|
|||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
cmn "github.com/tendermint/go-common"
|
||||
cfg "github.com/tendermint/go-config"
|
||||
crypto "github.com/tendermint/go-crypto"
|
||||
wire "github.com/tendermint/go-wire"
|
||||
)
|
||||
|
@ -25,23 +25,50 @@ type Peer struct {
|
|||
conn net.Conn // source connection
|
||||
mconn *MConnection // multiplex connection
|
||||
|
||||
authEnc bool // authenticated encryption
|
||||
persistent bool
|
||||
config cfg.Config
|
||||
config *PeerConfig
|
||||
|
||||
*NodeInfo
|
||||
Key string
|
||||
Data *cmn.CMap // User data.
|
||||
}
|
||||
|
||||
func newPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config, privKey crypto.PrivKeyEd25519) (*Peer, error) {
|
||||
// PeerConfig is a Peer configuration.
|
||||
type PeerConfig struct {
|
||||
AuthEnc bool // authenticated encryption
|
||||
|
||||
HandshakeTimeout time.Duration
|
||||
DialTimeout time.Duration
|
||||
|
||||
MConfig *MConnConfig
|
||||
|
||||
Fuzz bool // fuzz connection (for testing)
|
||||
FuzzConfig *FuzzConnConfig
|
||||
}
|
||||
|
||||
// DefaultPeerConfig returns the default config.
|
||||
func DefaultPeerConfig() *PeerConfig {
|
||||
return &PeerConfig{
|
||||
AuthEnc: true,
|
||||
HandshakeTimeout: 2 * time.Second,
|
||||
DialTimeout: 3 * time.Second,
|
||||
MConfig: DefaultMConnConfig(),
|
||||
Fuzz: false,
|
||||
FuzzConfig: DefaultFuzzConnConfig(),
|
||||
}
|
||||
}
|
||||
|
||||
func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
|
||||
return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig())
|
||||
}
|
||||
|
||||
func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
|
||||
conn, err := dial(addr, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "Error creating peer")
|
||||
}
|
||||
|
||||
// outbound = true
|
||||
peer, err := newPeerFromExistingConn(conn, true, reactorsByCh, chDescs, onPeerError, config, privKey)
|
||||
peer, err := newPeerFromConnAndConfig(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, err
|
||||
|
@ -49,31 +76,43 @@ func newPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*Channel
|
|||
return peer, nil
|
||||
}
|
||||
|
||||
func newPeerFromExistingConn(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config, privKey crypto.PrivKeyEd25519) (*Peer, error) {
|
||||
func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
|
||||
return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig())
|
||||
}
|
||||
|
||||
func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
|
||||
return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
|
||||
}
|
||||
|
||||
func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
|
||||
conn := rawConn
|
||||
|
||||
// Fuzz connection
|
||||
if config.Fuzz {
|
||||
// so we have time to do peer handshakes and get set up
|
||||
conn = FuzzConnAfterFromConfig(conn, 10*time.Second, config.FuzzConfig)
|
||||
}
|
||||
|
||||
// Encrypt connection
|
||||
if config.GetBool(configKeyAuthEnc) {
|
||||
if config.AuthEnc {
|
||||
conn.SetDeadline(time.Now().Add(config.HandshakeTimeout))
|
||||
|
||||
var err error
|
||||
// Set deadline for handshake so we don't block forever on conn.ReadFull
|
||||
timeout := time.Duration(config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second
|
||||
conn.SetDeadline(time.Now().Add(timeout))
|
||||
conn, err = MakeSecretConnection(conn, privKey)
|
||||
conn, err = MakeSecretConnection(conn, ourNodePrivKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "Error creating peer")
|
||||
}
|
||||
// remove deadline
|
||||
conn.SetDeadline(time.Time{})
|
||||
}
|
||||
|
||||
// Key and NodeInfo are set after Handshake
|
||||
p := &Peer{
|
||||
outbound: outbound,
|
||||
authEnc: config.GetBool(configKeyAuthEnc),
|
||||
conn: conn,
|
||||
config: config,
|
||||
Data: cmn.NewCMap(),
|
||||
}
|
||||
|
||||
p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config)
|
||||
p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig)
|
||||
|
||||
p.BaseService = *cmn.NewBaseService(log, "Peer", p)
|
||||
|
||||
|
@ -119,13 +158,13 @@ func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) er
|
|||
log.Notice("Peer handshake", "peerNodeInfo", peerNodeInfo)
|
||||
})
|
||||
if err1 != nil {
|
||||
return err1
|
||||
return errors.Wrap(err1, "Error during handshake/write")
|
||||
}
|
||||
if err2 != nil {
|
||||
return err2
|
||||
return errors.Wrap(err2, "Error during handshake/read")
|
||||
}
|
||||
|
||||
if p.authEnc {
|
||||
if p.config.AuthEnc {
|
||||
// Check that the professed PubKey matches the sconn's.
|
||||
if !peerNodeInfo.PubKey.Equals(p.PubKey()) {
|
||||
return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
|
||||
|
@ -136,7 +175,7 @@ func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) er
|
|||
// Remove deadline
|
||||
p.conn.SetDeadline(time.Time{})
|
||||
|
||||
peerNodeInfo.RemoteAddr = p.RemoteAddr().String()
|
||||
peerNodeInfo.RemoteAddr = p.Addr().String()
|
||||
|
||||
p.NodeInfo = peerNodeInfo
|
||||
p.Key = peerNodeInfo.PubKey.KeyString()
|
||||
|
@ -144,14 +183,14 @@ func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) er
|
|||
return nil
|
||||
}
|
||||
|
||||
// RemoteAddr returns the remote network address.
|
||||
func (p *Peer) RemoteAddr() net.Addr {
|
||||
// Addr returns peer's network address.
|
||||
func (p *Peer) Addr() net.Addr {
|
||||
return p.conn.RemoteAddr()
|
||||
}
|
||||
|
||||
// PubKey returns the remote public key.
|
||||
// PubKey returns peer's public key.
|
||||
func (p *Peer) PubKey() crypto.PubKeyEd25519 {
|
||||
if p.authEnc {
|
||||
if p.config.AuthEnc {
|
||||
return p.conn.(*SecretConnection).RemotePubKey()
|
||||
}
|
||||
if p.NodeInfo == nil {
|
||||
|
@ -238,21 +277,17 @@ func (p *Peer) Get(key string) interface{} {
|
|||
return p.Data.Get(key)
|
||||
}
|
||||
|
||||
func dial(addr *NetAddress, config cfg.Config) (net.Conn, error) {
|
||||
func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
|
||||
log.Info("Dialing address", "address", addr)
|
||||
conn, err := addr.DialTimeout(time.Duration(
|
||||
config.GetInt(configKeyDialTimeoutSeconds)) * time.Second)
|
||||
conn, err := addr.DialTimeout(config.DialTimeout)
|
||||
if err != nil {
|
||||
log.Info("Failed dialing address", "address", addr, "error", err)
|
||||
return nil, err
|
||||
}
|
||||
if config.GetBool(configFuzzEnable) {
|
||||
conn = FuzzConn(config, conn)
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config) *MConnection {
|
||||
func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
|
||||
onReceive := func(chID byte, msgBytes []byte) {
|
||||
reactor := reactorsByCh[chID]
|
||||
if reactor == nil {
|
||||
|
@ -265,10 +300,5 @@ func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, ch
|
|||
onPeerError(p, r)
|
||||
}
|
||||
|
||||
mconnConfig := &MConnectionConfig{
|
||||
SendRate: int64(config.GetInt(configKeySendRate)),
|
||||
RecvRate: int64(config.GetInt(configKeyRecvRate)),
|
||||
}
|
||||
|
||||
return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, mconnConfig)
|
||||
return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,156 @@
|
|||
package p2p
|
||||
|
||||
import (
|
||||
golog "log"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
crypto "github.com/tendermint/go-crypto"
|
||||
)
|
||||
|
||||
func TestPeerBasic(t *testing.T) {
|
||||
assert, require := assert.New(t), require.New(t)
|
||||
|
||||
// simulate remote peer
|
||||
rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()}
|
||||
rp.Start()
|
||||
defer rp.Stop()
|
||||
|
||||
p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), DefaultPeerConfig())
|
||||
require.Nil(err)
|
||||
|
||||
p.Start()
|
||||
defer p.Stop()
|
||||
|
||||
assert.True(p.IsRunning())
|
||||
assert.True(p.IsOutbound())
|
||||
assert.False(p.IsPersistent())
|
||||
p.makePersistent()
|
||||
assert.True(p.IsPersistent())
|
||||
assert.Equal(rp.Addr().String(), p.Addr().String())
|
||||
assert.Equal(rp.PubKey(), p.PubKey())
|
||||
}
|
||||
|
||||
func TestPeerWithoutAuthEnc(t *testing.T) {
|
||||
assert, require := assert.New(t), require.New(t)
|
||||
|
||||
config := DefaultPeerConfig()
|
||||
config.AuthEnc = false
|
||||
|
||||
// simulate remote peer
|
||||
rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: config}
|
||||
rp.Start()
|
||||
defer rp.Stop()
|
||||
|
||||
p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config)
|
||||
require.Nil(err)
|
||||
|
||||
p.Start()
|
||||
defer p.Stop()
|
||||
|
||||
assert.True(p.IsRunning())
|
||||
}
|
||||
|
||||
func TestPeerSend(t *testing.T) {
|
||||
assert, require := assert.New(t), require.New(t)
|
||||
|
||||
config := DefaultPeerConfig()
|
||||
config.AuthEnc = false
|
||||
|
||||
// simulate remote peer
|
||||
rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: config}
|
||||
rp.Start()
|
||||
defer rp.Stop()
|
||||
|
||||
p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config)
|
||||
require.Nil(err)
|
||||
|
||||
p.Start()
|
||||
defer p.Stop()
|
||||
|
||||
assert.True(p.CanSend(0x01))
|
||||
assert.True(p.Send(0x01, "Asylum"))
|
||||
}
|
||||
|
||||
func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) (*Peer, error) {
|
||||
chDescs := []*ChannelDescriptor{
|
||||
&ChannelDescriptor{ID: 0x01, Priority: 1},
|
||||
}
|
||||
reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)}
|
||||
pk := crypto.GenPrivKeyEd25519()
|
||||
p, err := newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, func(p *Peer, r interface{}) {}, pk, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = p.HandshakeTimeout(&NodeInfo{
|
||||
PubKey: pk.PubKey().(crypto.PubKeyEd25519),
|
||||
Moniker: "host_peer",
|
||||
Network: "testing",
|
||||
Version: "123.123.123",
|
||||
}, 1*time.Second)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
type remotePeer struct {
|
||||
PrivKey crypto.PrivKeyEd25519
|
||||
Config *PeerConfig
|
||||
addr *NetAddress
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
func (p *remotePeer) Addr() *NetAddress {
|
||||
return p.addr
|
||||
}
|
||||
|
||||
func (p *remotePeer) PubKey() crypto.PubKeyEd25519 {
|
||||
return p.PrivKey.PubKey().(crypto.PubKeyEd25519)
|
||||
}
|
||||
|
||||
func (p *remotePeer) Start() {
|
||||
l, e := net.Listen("tcp", "127.0.0.1:0") // any available address
|
||||
if e != nil {
|
||||
golog.Fatalf("net.Listen tcp :0: %+v", e)
|
||||
}
|
||||
p.addr = NewNetAddress(l.Addr())
|
||||
p.quit = make(chan struct{})
|
||||
go p.accept(l)
|
||||
}
|
||||
|
||||
func (p *remotePeer) Stop() {
|
||||
close(p.quit)
|
||||
}
|
||||
|
||||
func (p *remotePeer) accept(l net.Listener) {
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
golog.Fatalf("Failed to accept conn: %+v", err)
|
||||
}
|
||||
peer, err := newInboundPeerWithConfig(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p *Peer, r interface{}) {}, p.PrivKey, p.Config)
|
||||
if err != nil {
|
||||
golog.Fatalf("Failed to create a peer: %+v", err)
|
||||
}
|
||||
err = peer.HandshakeTimeout(&NodeInfo{
|
||||
PubKey: p.PrivKey.PubKey().(crypto.PubKeyEd25519),
|
||||
Moniker: "remote_peer",
|
||||
Network: "testing",
|
||||
Version: "123.123.123",
|
||||
}, 1*time.Second)
|
||||
if err != nil {
|
||||
golog.Fatalf("Failed to perform handshake: %+v", err)
|
||||
}
|
||||
select {
|
||||
case <-p.quit:
|
||||
conn.Close()
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
58
switch.go
58
switch.go
|
@ -200,7 +200,7 @@ func (sw *Switch) OnStop() {
|
|||
// NOTE: This performs a blocking handshake before the peer is added.
|
||||
// CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
|
||||
func (sw *Switch) AddPeer(peer *Peer) error {
|
||||
if err := sw.FilterConnByAddr(peer.RemoteAddr()); err != nil {
|
||||
if err := sw.FilterConnByAddr(peer.Addr()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -317,7 +317,7 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer,
|
|||
sw.dialing.Set(addr.IP.String(), addr)
|
||||
defer sw.dialing.Delete(addr.IP.String())
|
||||
|
||||
peer, err := newPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey)
|
||||
peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, peerConfigFromGoConfig(sw.config))
|
||||
if err != nil {
|
||||
log.Info("Failed dialing peer", "address", addr, "error", err)
|
||||
return nil, err
|
||||
|
@ -376,7 +376,7 @@ func (sw *Switch) Peers() IPeerSet {
|
|||
// Disconnect from a peer due to external error, retry if it is a persistent peer.
|
||||
// TODO: make record depending on reason.
|
||||
func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
|
||||
addr := NewNetAddress(peer.RemoteAddr())
|
||||
addr := NewNetAddress(peer.Addr())
|
||||
log.Notice("Stopping peer for error", "peer", peer, "error", reason)
|
||||
sw.stopAndRemovePeer(peer, reason)
|
||||
|
||||
|
@ -435,12 +435,8 @@ func (sw *Switch) listenerRoutine(l Listener) {
|
|||
continue
|
||||
}
|
||||
|
||||
if sw.config.GetBool(configFuzzEnable) {
|
||||
inConn = FuzzConn(sw.config, inConn)
|
||||
}
|
||||
|
||||
// New inbound connection!
|
||||
err := sw.AddPeerWithConnection(inConn, false)
|
||||
err := sw.addPeerWithConnectionAndConfig(inConn, peerConfigFromGoConfig(sw.config))
|
||||
if err != nil {
|
||||
log.Notice("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err)
|
||||
continue
|
||||
|
@ -502,14 +498,14 @@ func Connect2Switches(switches []*Switch, i, j int) {
|
|||
c1, c2 := net.Pipe()
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
err := switchI.AddPeerWithConnection(c1, false)
|
||||
err := switchI.addPeerWithConnection(c1)
|
||||
if PanicOnAddPeerErr && err != nil {
|
||||
panic(err)
|
||||
}
|
||||
doneCh <- struct{}{}
|
||||
}()
|
||||
go func() {
|
||||
err := switchJ.AddPeerWithConnection(c2, false)
|
||||
err := switchJ.addPeerWithConnection(c2)
|
||||
if PanicOnAddPeerErr && err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -544,18 +540,52 @@ func makeSwitch(i int, network, version string, initSwitch func(int, *Switch) *S
|
|||
return s
|
||||
}
|
||||
|
||||
// AddPeerWithConnection creates a newPeer from the connection, performs the handshake, and adds it to the switch.
|
||||
func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) error {
|
||||
peer, err := newPeerFromExistingConn(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey)
|
||||
func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
|
||||
peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
if err = sw.AddPeer(peer); err != nil {
|
||||
peer.CloseConn()
|
||||
conn.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
|
||||
peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
if err = sw.AddPeer(peer); err != nil {
|
||||
conn.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func peerConfigFromGoConfig(config cfg.Config) *PeerConfig {
|
||||
return &PeerConfig{
|
||||
AuthEnc: config.GetBool(configKeyAuthEnc),
|
||||
Fuzz: config.GetBool(configFuzzEnable),
|
||||
HandshakeTimeout: time.Duration(config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second,
|
||||
DialTimeout: time.Duration(config.GetInt(configKeyDialTimeoutSeconds)) * time.Second,
|
||||
MConfig: &MConnConfig{
|
||||
SendRate: int64(config.GetInt(configKeySendRate)),
|
||||
RecvRate: int64(config.GetInt(configKeyRecvRate)),
|
||||
},
|
||||
FuzzConfig: &FuzzConnConfig{
|
||||
Mode: config.GetInt(configFuzzMode),
|
||||
MaxDelay: time.Duration(config.GetInt(configFuzzMaxDelayMilliseconds)) * time.Millisecond,
|
||||
ProbDropRW: config.GetFloat64(configFuzzProbDropRW),
|
||||
ProbDropConn: config.GetFloat64(configFuzzProbDropConn),
|
||||
ProbSleep: config.GetFloat64(configFuzzProbSleep),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@ package p2p
|
|||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
golog "log"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -12,7 +11,6 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
. "github.com/tendermint/go-common"
|
||||
cmn "github.com/tendermint/go-common"
|
||||
cfg "github.com/tendermint/go-config"
|
||||
crypto "github.com/tendermint/go-crypto"
|
||||
wire "github.com/tendermint/go-wire"
|
||||
|
@ -178,10 +176,10 @@ func TestConnAddrFilter(t *testing.T) {
|
|||
|
||||
// connect to good peer
|
||||
go func() {
|
||||
s1.AddPeerWithConnection(c1, false)
|
||||
s1.addPeerWithConnection(c1)
|
||||
}()
|
||||
go func() {
|
||||
s2.AddPeerWithConnection(c2, true)
|
||||
s2.addPeerWithConnection(c2)
|
||||
}()
|
||||
|
||||
// Wait for things to happen, peers to get added...
|
||||
|
@ -213,10 +211,10 @@ func TestConnPubKeyFilter(t *testing.T) {
|
|||
|
||||
// connect to good peer
|
||||
go func() {
|
||||
s1.AddPeerWithConnection(c1, false)
|
||||
s1.addPeerWithConnection(c1)
|
||||
}()
|
||||
go func() {
|
||||
s2.AddPeerWithConnection(c2, true)
|
||||
s2.addPeerWithConnection(c2)
|
||||
}()
|
||||
|
||||
// Wait for things to happen, peers to get added...
|
||||
|
@ -239,14 +237,12 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
|
|||
sw.Start()
|
||||
defer sw.Stop()
|
||||
|
||||
sw2 := makeSwitch(2, "testing", "123.123.123", initSwitchFunc)
|
||||
defer sw2.Stop()
|
||||
l, serverAddr := listenTCP()
|
||||
done := make(chan struct{})
|
||||
go accept(l, done, sw2)
|
||||
defer close(done)
|
||||
// simulate remote peer
|
||||
rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()}
|
||||
rp.Start()
|
||||
defer rp.Stop()
|
||||
|
||||
peer, err := newPeer(NewNetAddress(serverAddr), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey)
|
||||
peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
|
||||
require.Nil(err)
|
||||
err = sw.AddPeer(peer)
|
||||
require.Nil(err)
|
||||
|
@ -267,14 +263,12 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
|
|||
sw.Start()
|
||||
defer sw.Stop()
|
||||
|
||||
sw2 := makeSwitch(2, "testing", "123.123.123", initSwitchFunc)
|
||||
defer sw2.Stop()
|
||||
l, serverAddr := listenTCP()
|
||||
done := make(chan struct{})
|
||||
go accept(l, done, sw2)
|
||||
defer close(done)
|
||||
// simulate remote peer
|
||||
rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()}
|
||||
rp.Start()
|
||||
defer rp.Stop()
|
||||
|
||||
peer, err := newPeer(NewNetAddress(serverAddr), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey)
|
||||
peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
|
||||
peer.makePersistent()
|
||||
require.Nil(err)
|
||||
err = sw.AddPeer(peer)
|
||||
|
@ -334,48 +328,3 @@ func BenchmarkSwitches(b *testing.B) {
|
|||
time.Sleep(1000 * time.Millisecond)
|
||||
|
||||
}
|
||||
|
||||
func listenTCP() (net.Listener, net.Addr) {
|
||||
l, e := net.Listen("tcp", "127.0.0.1:0") // any available address
|
||||
if e != nil {
|
||||
golog.Fatalf("net.Listen tcp :0: %+v", e)
|
||||
}
|
||||
return l, l.Addr()
|
||||
}
|
||||
|
||||
// simulate remote peer
|
||||
func accept(l net.Listener, done <-chan struct{}, sw *Switch) {
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
golog.Fatalf("Failed to accept conn: %+v", err)
|
||||
}
|
||||
conn, err = MakeSecretConnection(conn, sw.nodePrivKey)
|
||||
if err != nil {
|
||||
golog.Fatalf("Failed to make secret conn: %+v", err)
|
||||
}
|
||||
var err1, err2 error
|
||||
nodeInfo := new(NodeInfo)
|
||||
cmn.Parallel(
|
||||
func() {
|
||||
var n int
|
||||
wire.WriteBinary(sw.nodeInfo, conn, &n, &err1)
|
||||
},
|
||||
func() {
|
||||
var n int
|
||||
wire.ReadBinary(nodeInfo, conn, maxNodeInfoSize, &n, &err2)
|
||||
})
|
||||
if err1 != nil {
|
||||
golog.Fatalf("Failed to do handshake: %+v", err1)
|
||||
}
|
||||
if err2 != nil {
|
||||
golog.Fatalf("Failed to do handshake: %+v", err2)
|
||||
}
|
||||
select {
|
||||
case <-done:
|
||||
conn.Close()
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue