test peer

This commit is contained in:
Anton Kaliaev 2017-04-11 19:47:05 +04:00
parent c39e001a95
commit 7dcc3dbcd1
No known key found for this signature in database
GPG Key ID: 7B6881D965918214
7 changed files with 332 additions and 156 deletions

View File

@ -17,7 +17,6 @@ const (
// Fuzz params // Fuzz params
configFuzzEnable = "fuzz_enable" // use the fuzz wrapped conn configFuzzEnable = "fuzz_enable" // use the fuzz wrapped conn
configFuzzActive = "fuzz_active" // toggle fuzzing
configFuzzMode = "fuzz_mode" // eg. drop, delay configFuzzMode = "fuzz_mode" // eg. drop, delay
configFuzzMaxDelayMilliseconds = "fuzz_max_delay_milliseconds" configFuzzMaxDelayMilliseconds = "fuzz_max_delay_milliseconds"
configFuzzProbDropRW = "fuzz_prob_drop_rw" configFuzzProbDropRW = "fuzz_prob_drop_rw"
@ -38,7 +37,6 @@ func setConfigDefaults(config cfg.Config) {
// Fuzz defaults // Fuzz defaults
config.SetDefault(configFuzzEnable, false) config.SetDefault(configFuzzEnable, false)
config.SetDefault(configFuzzActive, false)
config.SetDefault(configFuzzMode, FuzzModeDrop) config.SetDefault(configFuzzMode, FuzzModeDrop)
config.SetDefault(configFuzzMaxDelayMilliseconds, 3000) config.SetDefault(configFuzzMaxDelayMilliseconds, 3000)
config.SetDefault(configFuzzProbDropRW, 0.2) config.SetDefault(configFuzzProbDropRW, 0.2)

View File

@ -74,7 +74,7 @@ type MConnection struct {
onReceive receiveCbFunc onReceive receiveCbFunc
onError errorCbFunc onError errorCbFunc
errored uint32 errored uint32
config *MConnectionConfig config *MConnConfig
quit chan struct{} quit chan struct{}
flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
@ -85,12 +85,19 @@ type MConnection struct {
RemoteAddress *NetAddress RemoteAddress *NetAddress
} }
// MConnectionConfig is a MConnection configuration // MConnConfig is a MConnection configuration
type MConnectionConfig struct { type MConnConfig struct {
SendRate int64 SendRate int64
RecvRate int64 RecvRate int64
} }
func defaultMConnectionConfig() *MConnConfig {
return &MConnConfig{
SendRate: defaultSendRate,
RecvRate: defaultRecvRate,
}
}
// NewMConnection wraps net.Conn and creates multiplex connection // NewMConnection wraps net.Conn and creates multiplex connection
func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection { func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
return NewMConnectionWithConfig( return NewMConnectionWithConfig(
@ -98,14 +105,11 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei
chDescs, chDescs,
onReceive, onReceive,
onError, onError,
&MConnectionConfig{ defaultMConnectionConfig())
SendRate: defaultSendRate,
RecvRate: defaultRecvRate,
})
} }
// NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnectionConfig) *MConnection { func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
mconn := &MConnection{ mconn := &MConnection{
conn: conn, conn: conn,
bufReader: bufio.NewReaderSize(conn, minReadBufferSize), bufReader: bufio.NewReaderSize(conn, minReadBufferSize),

145
fuzz.go
View File

@ -1,90 +1,139 @@
package p2p package p2p
import ( import (
"fmt"
"math/rand" "math/rand"
"net" "net"
"sync" "sync"
"time" "time"
cfg "github.com/tendermint/go-config"
) )
//--------------------------------------------------------
// delay reads/writes
// randomly drop reads/writes
// randomly drop connections
const ( const (
FuzzModeDrop = "drop" // FuzzModeDrop is a mode in which we randomly drop reads/writes, connections or sleep
FuzzModeDelay = "delay" FuzzModeDrop = iota
// FuzzModeDelay is a mode in which we randomly sleep
FuzzModeDelay
) )
func FuzzConn(config cfg.Config, conn net.Conn) net.Conn { type FuzzConnConfig struct {
return &FuzzedConnection{ Mode int
conn: conn, MaxDelay time.Duration
start: time.After(time.Second * 10), // so we have time to do peer handshakes and get set up ProbDropRW float64
params: config, ProbDropConn float64
ProbSleep float64
}
func defaultFuzzConnConfig() *FuzzConnConfig {
return &FuzzConnConfig{
Mode: FuzzModeDrop,
MaxDelay: 3 * time.Second,
ProbDropRW: 0.2,
ProbDropConn: 0.00,
ProbSleep: 0.00,
} }
} }
func FuzzConn(conn net.Conn) net.Conn {
return FuzzConnFromConfig(conn, defaultFuzzConnConfig())
}
func FuzzConnFromConfig(conn net.Conn, config *FuzzConnConfig) net.Conn {
return &FuzzedConnection{
conn: conn,
start: make(<-chan time.Time),
active: true,
mode: config.Mode,
maxDelay: config.MaxDelay,
probDropRW: config.ProbDropRW,
probDropConn: config.ProbDropConn,
probSleep: config.ProbSleep,
}
}
func FuzzConnAfter(conn net.Conn, d time.Duration) net.Conn {
return FuzzConnAfterFromConfig(conn, d, defaultFuzzConnConfig())
}
func FuzzConnAfterFromConfig(conn net.Conn, d time.Duration, config *FuzzConnConfig) net.Conn {
return &FuzzedConnection{
conn: conn,
start: time.After(d),
active: false,
mode: config.Mode,
maxDelay: config.MaxDelay,
probDropRW: config.ProbDropRW,
probDropConn: config.ProbDropConn,
probSleep: config.ProbSleep,
}
}
// FuzzedConnection wraps any net.Conn and depending on the mode either delays
// reads/writes or randomly drops reads/writes/connections.
type FuzzedConnection struct { type FuzzedConnection struct {
conn net.Conn conn net.Conn
mtx sync.Mutex mtx sync.Mutex
fuzz bool // we don't start fuzzing right away start <-chan time.Time
start <-chan time.Time active bool
// fuzz params mode int
params cfg.Config maxDelay time.Duration
probDropRW float64
probDropConn float64
probSleep float64
} }
func (fc *FuzzedConnection) randomDuration() time.Duration { func (fc *FuzzedConnection) randomDuration() time.Duration {
return time.Millisecond * time.Duration(rand.Int()%fc.MaxDelayMilliseconds()) maxDelayMillis := int(fc.maxDelay.Nanoseconds() / 1000)
return time.Millisecond * time.Duration(rand.Int()%maxDelayMillis)
} }
func (fc *FuzzedConnection) Active() bool { func (fc *FuzzedConnection) SetMode(mode int) {
return fc.params.GetBool(configFuzzActive) switch mode {
case FuzzModeDrop:
fc.mode = FuzzModeDrop
case FuzzModeDelay:
fc.mode = FuzzModeDelay
default:
panic(fmt.Sprintf("Unknown mode %d", mode))
}
} }
func (fc *FuzzedConnection) Mode() string { func (fc *FuzzedConnection) SetProbDropRW(prob float64) {
return fc.params.GetString(configFuzzMode) fc.probDropRW = prob
} }
func (fc *FuzzedConnection) ProbDropRW() float64 { func (fc *FuzzedConnection) SetProbDropConn(prob float64) {
return fc.params.GetFloat64(configFuzzProbDropRW) fc.probDropConn = prob
} }
func (fc *FuzzedConnection) ProbDropConn() float64 { func (fc *FuzzedConnection) SetProbSleep(prob float64) {
return fc.params.GetFloat64(configFuzzProbDropConn) fc.probSleep = prob
} }
func (fc *FuzzedConnection) ProbSleep() float64 { func (fc *FuzzedConnection) SetMaxDelay(d time.Duration) {
return fc.params.GetFloat64(configFuzzProbSleep) fc.maxDelay = d
}
func (fc *FuzzedConnection) MaxDelayMilliseconds() int {
return fc.params.GetInt(configFuzzMaxDelayMilliseconds)
} }
// implements the fuzz (delay, kill conn) // implements the fuzz (delay, kill conn)
// and returns whether or not the read/write should be ignored // and returns whether or not the read/write should be ignored
func (fc *FuzzedConnection) Fuzz() bool { func (fc *FuzzedConnection) fuzz() bool {
if !fc.shouldFuzz() { if !fc.shouldFuzz() {
return false return false
} }
switch fc.Mode() { switch fc.mode {
case FuzzModeDrop: case FuzzModeDrop:
// randomly drop the r/w, drop the conn, or sleep // randomly drop the r/w, drop the conn, or sleep
r := rand.Float64() r := rand.Float64()
if r <= fc.ProbDropRW() { if r <= fc.probDropRW {
return true return true
} else if r < fc.ProbDropRW()+fc.ProbDropConn() { } else if r < fc.probDropRW+fc.probDropConn {
// XXX: can't this fail because machine precision? // XXX: can't this fail because machine precision?
// XXX: do we need an error? // XXX: do we need an error?
fc.Close() fc.Close()
return true return true
} else if r < fc.ProbDropRW()+fc.ProbDropConn()+fc.ProbSleep() { } else if r < fc.probDropRW+fc.probDropConn+fc.probSleep {
time.Sleep(fc.randomDuration()) time.Sleep(fc.randomDuration())
} }
case FuzzModeDelay: case FuzzModeDelay:
@ -96,33 +145,33 @@ func (fc *FuzzedConnection) Fuzz() bool {
// we don't fuzz until start chan fires // we don't fuzz until start chan fires
func (fc *FuzzedConnection) shouldFuzz() bool { func (fc *FuzzedConnection) shouldFuzz() bool {
if !fc.Active() { if fc.active {
return false return true
} }
fc.mtx.Lock() fc.mtx.Lock()
defer fc.mtx.Unlock() defer fc.mtx.Unlock()
if fc.fuzz {
return true
}
select { select {
case <-fc.start: case <-fc.start:
fc.fuzz = true fc.active = true
return true
default: default:
return false
} }
return false
} }
// Read implements net.Conn
func (fc *FuzzedConnection) Read(data []byte) (n int, err error) { func (fc *FuzzedConnection) Read(data []byte) (n int, err error) {
if fc.Fuzz() { if fc.fuzz() {
return 0, nil return 0, nil
} }
return fc.conn.Read(data) return fc.conn.Read(data)
} }
// Write implements net.Conn
func (fc *FuzzedConnection) Write(data []byte) (n int, err error) { func (fc *FuzzedConnection) Write(data []byte) (n int, err error) {
if fc.Fuzz() { if fc.fuzz() {
return 0, nil return 0, nil
} }
return fc.conn.Write(data) return fc.conn.Write(data)

85
peer.go
View File

@ -7,7 +7,6 @@ import (
"time" "time"
cmn "github.com/tendermint/go-common" cmn "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
crypto "github.com/tendermint/go-crypto" crypto "github.com/tendermint/go-crypto"
wire "github.com/tendermint/go-wire" wire "github.com/tendermint/go-wire"
) )
@ -25,23 +24,50 @@ type Peer struct {
conn net.Conn // source connection conn net.Conn // source connection
mconn *MConnection // multiplex connection mconn *MConnection // multiplex connection
authEnc bool // authenticated encryption
persistent bool persistent bool
config cfg.Config config *PeerConfig
*NodeInfo *NodeInfo
Key string Key string
Data *cmn.CMap // User data. Data *cmn.CMap // User data.
} }
func newPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config, privKey crypto.PrivKeyEd25519) (*Peer, error) { // PeerConfig is a Peer configuration
type PeerConfig struct {
AuthEnc bool // authenticated encryption
HandshakeTimeout time.Duration
DialTimeout time.Duration
MConfig *MConnConfig
Fuzz bool // fuzz connection (for testing)
FuzzConfig *FuzzConnConfig
}
func defaultPeerConfig() *PeerConfig {
return &PeerConfig{
AuthEnc: true,
Fuzz: false,
HandshakeTimeout: 20 * time.Second,
DialTimeout: 3 * time.Second,
MConfig: defaultMConnectionConfig(),
FuzzConfig: defaultFuzzConnConfig(),
}
}
func newPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
return newPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, defaultPeerConfig())
}
func newPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
conn, err := dial(addr, config) conn, err := dial(addr, config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// outbound = true // outbound = true
peer, err := newPeerFromExistingConn(conn, true, reactorsByCh, chDescs, onPeerError, config, privKey) peer, err := newPeerFromExistingConnAndConfig(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
if err != nil { if err != nil {
conn.Close() conn.Close()
return nil, err return nil, err
@ -49,31 +75,39 @@ func newPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*Channel
return peer, nil return peer, nil
} }
func newPeerFromExistingConn(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config, privKey crypto.PrivKeyEd25519) (*Peer, error) { func newPeerFromExistingConn(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
return newPeerFromExistingConnAndConfig(rawConn, outbound, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, defaultPeerConfig())
}
func newPeerFromExistingConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
conn := rawConn
// Fuzz connection
if config.Fuzz {
// so we have time to do peer handshakes and get set up
conn = FuzzConnAfterFromConfig(conn, 10*time.Second, config.FuzzConfig)
}
// Encrypt connection // Encrypt connection
if config.GetBool(configKeyAuthEnc) { if config.AuthEnc {
conn.SetDeadline(time.Now().Add(config.HandshakeTimeout))
var err error var err error
// Set deadline for handshake so we don't block forever on conn.ReadFull conn, err = MakeSecretConnection(conn, ourNodePrivKey)
timeout := time.Duration(config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second
conn.SetDeadline(time.Now().Add(timeout))
conn, err = MakeSecretConnection(conn, privKey)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// remove deadline
conn.SetDeadline(time.Time{})
} }
// Key and NodeInfo are set after Handshake // Key and NodeInfo are set after Handshake
p := &Peer{ p := &Peer{
outbound: outbound, outbound: outbound,
authEnc: config.GetBool(configKeyAuthEnc),
conn: conn, conn: conn,
config: config, config: config,
Data: cmn.NewCMap(), Data: cmn.NewCMap(),
} }
p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config) p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig)
p.BaseService = *cmn.NewBaseService(log, "Peer", p) p.BaseService = *cmn.NewBaseService(log, "Peer", p)
@ -125,7 +159,7 @@ func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) er
return err2 return err2
} }
if p.authEnc { if p.config.AuthEnc {
// Check that the professed PubKey matches the sconn's. // Check that the professed PubKey matches the sconn's.
if !peerNodeInfo.PubKey.Equals(p.PubKey()) { if !peerNodeInfo.PubKey.Equals(p.PubKey()) {
return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v", return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
@ -151,7 +185,7 @@ func (p *Peer) RemoteAddr() net.Addr {
// PubKey returns the remote public key. // PubKey returns the remote public key.
func (p *Peer) PubKey() crypto.PubKeyEd25519 { func (p *Peer) PubKey() crypto.PubKeyEd25519 {
if p.authEnc { if p.config.AuthEnc {
return p.conn.(*SecretConnection).RemotePubKey() return p.conn.(*SecretConnection).RemotePubKey()
} }
if p.NodeInfo == nil { if p.NodeInfo == nil {
@ -238,21 +272,17 @@ func (p *Peer) Get(key string) interface{} {
return p.Data.Get(key) return p.Data.Get(key)
} }
func dial(addr *NetAddress, config cfg.Config) (net.Conn, error) { func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
log.Info("Dialing address", "address", addr) log.Info("Dialing address", "address", addr)
conn, err := addr.DialTimeout(time.Duration( conn, err := addr.DialTimeout(config.DialTimeout)
config.GetInt(configKeyDialTimeoutSeconds)) * time.Second)
if err != nil { if err != nil {
log.Info("Failed dialing address", "address", addr, "error", err) log.Info("Failed dialing address", "address", addr, "error", err)
return nil, err return nil, err
} }
if config.GetBool(configFuzzEnable) {
conn = FuzzConn(config, conn)
}
return conn, nil return conn, nil
} }
func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config) *MConnection { func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
onReceive := func(chID byte, msgBytes []byte) { onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID] reactor := reactorsByCh[chID]
if reactor == nil { if reactor == nil {
@ -265,10 +295,5 @@ func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, ch
onPeerError(p, r) onPeerError(p, r)
} }
mconnConfig := &MConnectionConfig{ return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
SendRate: int64(config.GetInt(configKeySendRate)),
RecvRate: int64(config.GetInt(configKeyRecvRate)),
}
return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, mconnConfig)
} }

120
peer_test.go Normal file
View File

@ -0,0 +1,120 @@
package p2p
import (
golog "log"
"net"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cmn "github.com/tendermint/go-common"
crypto "github.com/tendermint/go-crypto"
wire "github.com/tendermint/go-wire"
)
func TestPeerStartStop(t *testing.T) {
assert, require := assert.New(t), require.New(t)
// simulate remote peer
rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519()}
rp.Start()
defer rp.Stop()
p, err := createPeerAndPerformHandshake(rp.RemoteAddr())
require.Nil(err)
p.Start()
defer p.Stop()
assert.True(p.IsRunning())
}
func createPeerAndPerformHandshake(addr *NetAddress) (*Peer, error) {
chDescs := []*ChannelDescriptor{
&ChannelDescriptor{ID: 0x01, Priority: 1},
}
reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)}
pk := crypto.GenPrivKeyEd25519()
p, err := newPeer(addr, reactorsByCh, chDescs, func(p *Peer, r interface{}) {}, pk)
if err != nil {
return nil, err
}
err = p.HandshakeTimeout(&NodeInfo{
PubKey: pk.PubKey().(crypto.PubKeyEd25519),
Moniker: "remote_peer",
Network: "testing",
Version: "123.123.123",
}, 1*time.Second)
if err != nil {
return nil, err
}
return p, nil
}
type remotePeer struct {
PrivKey crypto.PrivKeyEd25519
addr *NetAddress
quit chan struct{}
}
func (p *remotePeer) RemoteAddr() *NetAddress {
return p.addr
}
func (p *remotePeer) Start() {
l, e := net.Listen("tcp", "127.0.0.1:0") // any available address
if e != nil {
golog.Fatalf("net.Listen tcp :0: %+v", e)
}
p.addr = NewNetAddress(l.Addr())
p.quit = make(chan struct{})
go p.accept(l)
}
func (p *remotePeer) Stop() {
close(p.quit)
}
func (p *remotePeer) accept(l net.Listener) {
for {
conn, err := l.Accept()
if err != nil {
golog.Fatalf("Failed to accept conn: %+v", err)
}
conn, err = MakeSecretConnection(conn, p.PrivKey)
if err != nil {
golog.Fatalf("Failed to make secret conn: %+v", err)
}
var err1, err2 error
nodeInfo := new(NodeInfo)
cmn.Parallel(
func() {
var n int
ourNodeInfo := &NodeInfo{
PubKey: p.PrivKey.PubKey().(crypto.PubKeyEd25519),
Moniker: "remote_peer",
Network: "testing",
Version: "123.123.123",
}
wire.WriteBinary(ourNodeInfo, conn, &n, &err1)
},
func() {
var n int
wire.ReadBinary(nodeInfo, conn, maxNodeInfoSize, &n, &err2)
})
if err1 != nil {
golog.Fatalf("Failed to do handshake: %+v", err1)
}
if err2 != nil {
golog.Fatalf("Failed to do handshake: %+v", err2)
}
select {
case <-p.quit:
conn.Close()
return
default:
}
}
}

View File

@ -317,7 +317,7 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer,
sw.dialing.Set(addr.IP.String(), addr) sw.dialing.Set(addr.IP.String(), addr)
defer sw.dialing.Delete(addr.IP.String()) defer sw.dialing.Delete(addr.IP.String())
peer, err := newPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey) peer, err := newPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, peerConfigFromGoConfig(sw.config))
if err != nil { if err != nil {
log.Info("Failed dialing peer", "address", addr, "error", err) log.Info("Failed dialing peer", "address", addr, "error", err)
return nil, err return nil, err
@ -435,12 +435,8 @@ func (sw *Switch) listenerRoutine(l Listener) {
continue continue
} }
if sw.config.GetBool(configFuzzEnable) {
inConn = FuzzConn(sw.config, inConn)
}
// New inbound connection! // New inbound connection!
err := sw.AddPeerWithConnection(inConn, false) err := sw.AddPeerWithConnectionAndConfig(inConn, false, peerConfigFromGoConfig(sw.config))
if err != nil { if err != nil {
log.Notice("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err) log.Notice("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err)
continue continue
@ -546,7 +542,7 @@ func makeSwitch(i int, network, version string, initSwitch func(int, *Switch) *S
// AddPeerWithConnection creates a newPeer from the connection, performs the handshake, and adds it to the switch. // AddPeerWithConnection creates a newPeer from the connection, performs the handshake, and adds it to the switch.
func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) error { func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) error {
peer, err := newPeerFromExistingConn(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey) peer, err := newPeerFromExistingConn(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
if err != nil { if err != nil {
conn.Close() conn.Close()
return err return err
@ -559,3 +555,38 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) error {
return nil return nil
} }
func (sw *Switch) AddPeerWithConnectionAndConfig(conn net.Conn, outbound bool, config *PeerConfig) error {
peer, err := newPeerFromExistingConnAndConfig(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
if err != nil {
peer.CloseConn()
return err
}
if err = sw.AddPeer(peer); err != nil {
peer.CloseConn()
return err
}
return nil
}
func peerConfigFromGoConfig(config cfg.Config) *PeerConfig {
return &PeerConfig{
AuthEnc: config.GetBool(configKeyAuthEnc),
Fuzz: config.GetBool(configFuzzEnable),
HandshakeTimeout: time.Duration(config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second,
DialTimeout: time.Duration(config.GetInt(configKeyDialTimeoutSeconds)) * time.Second,
MConfig: &MConnConfig{
SendRate: int64(config.GetInt(configKeySendRate)),
RecvRate: int64(config.GetInt(configKeyRecvRate)),
},
FuzzConfig: &FuzzConnConfig{
Mode: config.GetInt(configFuzzMode),
MaxDelay: time.Duration(config.GetInt(configFuzzMaxDelayMilliseconds)) * time.Millisecond,
ProbDropRW: config.GetFloat64(configFuzzProbDropRW),
ProbDropConn: config.GetFloat64(configFuzzProbDropConn),
ProbSleep: config.GetFloat64(configFuzzProbSleep),
},
}
}

View File

@ -3,7 +3,6 @@ package p2p
import ( import (
"bytes" "bytes"
"fmt" "fmt"
golog "log"
"net" "net"
"sync" "sync"
"testing" "testing"
@ -12,7 +11,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
cmn "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
crypto "github.com/tendermint/go-crypto" crypto "github.com/tendermint/go-crypto"
wire "github.com/tendermint/go-wire" wire "github.com/tendermint/go-wire"
@ -239,14 +237,12 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
sw.Start() sw.Start()
defer sw.Stop() defer sw.Stop()
sw2 := makeSwitch(2, "testing", "123.123.123", initSwitchFunc) // simulate remote peer
defer sw2.Stop() rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519()}
l, serverAddr := listenTCP() rp.Start()
done := make(chan struct{}) defer rp.Stop()
go accept(l, done, sw2)
defer close(done)
peer, err := newPeer(NewNetAddress(serverAddr), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey) peer, err := newPeer(rp.RemoteAddr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
require.Nil(err) require.Nil(err)
err = sw.AddPeer(peer) err = sw.AddPeer(peer)
require.Nil(err) require.Nil(err)
@ -267,14 +263,12 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
sw.Start() sw.Start()
defer sw.Stop() defer sw.Stop()
sw2 := makeSwitch(2, "testing", "123.123.123", initSwitchFunc) // simulate remote peer
defer sw2.Stop() rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519()}
l, serverAddr := listenTCP() rp.Start()
done := make(chan struct{}) defer rp.Stop()
go accept(l, done, sw2)
defer close(done)
peer, err := newPeer(NewNetAddress(serverAddr), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey) peer, err := newPeer(rp.RemoteAddr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
peer.makePersistent() peer.makePersistent()
require.Nil(err) require.Nil(err)
err = sw.AddPeer(peer) err = sw.AddPeer(peer)
@ -334,48 +328,3 @@ func BenchmarkSwitches(b *testing.B) {
time.Sleep(1000 * time.Millisecond) time.Sleep(1000 * time.Millisecond)
} }
func listenTCP() (net.Listener, net.Addr) {
l, e := net.Listen("tcp", "127.0.0.1:0") // any available address
if e != nil {
golog.Fatalf("net.Listen tcp :0: %+v", e)
}
return l, l.Addr()
}
// simulate remote peer
func accept(l net.Listener, done <-chan struct{}, sw *Switch) {
for {
conn, err := l.Accept()
if err != nil {
golog.Fatalf("Failed to accept conn: %+v", err)
}
conn, err = MakeSecretConnection(conn, sw.nodePrivKey)
if err != nil {
golog.Fatalf("Failed to make secret conn: %+v", err)
}
var err1, err2 error
nodeInfo := new(NodeInfo)
cmn.Parallel(
func() {
var n int
wire.WriteBinary(sw.nodeInfo, conn, &n, &err1)
},
func() {
var n int
wire.ReadBinary(nodeInfo, conn, maxNodeInfoSize, &n, &err2)
})
if err1 != nil {
golog.Fatalf("Failed to do handshake: %+v", err1)
}
if err2 != nil {
golog.Fatalf("Failed to do handshake: %+v", err2)
}
select {
case <-done:
conn.Close()
return
default:
}
}
}