From 3e2f37786379ba9d1d73783d6bdc7dc0eb415013 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 2 Jul 2014 16:03:04 -0700 Subject: [PATCH] benchmark works, but could use some improvement. ~33k packets/sec for a single local pair. --- common/random.go | 12 ++++++++++ merkle/iavl_test.go | 23 +++++++----------- merkle/iavl_tree.go | 4 ++++ merkle/util.go | 16 +------------ peer/client.go | 28 +++++++++++----------- peer/client_test.go | 57 +++++++++++++++++++++++++++++++++------------ peer/connection.go | 9 ++++--- peer/listener.go | 13 +++++++++-- peer/log.go | 2 +- peer/peer.go | 3 ++- peer/server.go | 4 +--- 11 files changed, 103 insertions(+), 68 deletions(-) create mode 100644 common/random.go diff --git a/common/random.go b/common/random.go new file mode 100644 index 00000000..186fb2a4 --- /dev/null +++ b/common/random.go @@ -0,0 +1,12 @@ +package common + +import "crypto/rand" + +func RandStr(length int) string { + b := make([]byte, length) + _, err := rand.Read(b) + if err != nil { + return "" + } + return string(b) +} diff --git a/merkle/iavl_test.go b/merkle/iavl_test.go index 78dc0b2e..9dec2631 100644 --- a/merkle/iavl_test.go +++ b/merkle/iavl_test.go @@ -3,29 +3,22 @@ package merkle import ( "bytes" "crypto/sha256" - "encoding/binary" "fmt" + . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/db" - "math/rand" - "os" + "runtime" "testing" ) func init() { - if urandom, err := os.Open("/dev/urandom"); err != nil { - return - } else { - buf := make([]byte, 8) - if _, err := urandom.Read(buf); err == nil { - buf_reader := bytes.NewReader(buf) - if seed, err := binary.ReadVarint(buf_reader); err == nil { - rand.Seed(seed) - } - } - urandom.Close() - } + // TODO: seed rand? +} + +func randstr(length int) String { + return String(RandStr(length)) } func TestUnit(t *testing.T) { diff --git a/merkle/iavl_tree.go b/merkle/iavl_tree.go index 0e6fb79d..d6d530b9 100644 --- a/merkle/iavl_tree.go +++ b/merkle/iavl_tree.go @@ -117,6 +117,10 @@ func (t *IAVLTree) Traverse(cb func(Node) bool) { func (t *IAVLTree) Values() <-chan Value { root := t.root ch := make(chan Value) + if root == nil { + close(ch) + return ch + } go func() { root.traverse(t.db, func(n Node) bool { if n.Height() == 0 { diff --git a/merkle/util.go b/merkle/util.go index 740e8cec..578b406f 100644 --- a/merkle/util.go +++ b/merkle/util.go @@ -3,8 +3,8 @@ package merkle import ( "crypto/sha256" "fmt" + . "github.com/tendermint/tendermint/binary" - "os" ) /* @@ -61,20 +61,6 @@ func printIAVLNode(node *IAVLNode, indent int) { } -func randstr(length int) String { - if urandom, err := os.Open("/dev/urandom"); err != nil { - panic(err) - } else { - slice := make([]byte, length) - if _, err := urandom.Read(slice); err != nil { - panic(err) - } - urandom.Close() - return String(slice) - } - panic("unreachable") -} - func maxUint8(a, b uint8) uint8 { if a > b { return a diff --git a/peer/client.go b/peer/client.go index 27b3c711..7689ee8b 100644 --- a/peer/client.go +++ b/peer/client.go @@ -2,11 +2,12 @@ package peer import ( "errors" + "sync" + "sync/atomic" + . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/merkle" - "sync" - "sync/atomic" ) /* Client @@ -99,20 +100,23 @@ func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, return peer, nil } -func (c *Client) Broadcast(pkt Packet) { +func (c *Client) Broadcast(pkt Packet) (numSuccess, numFailure int) { if atomic.LoadUint32(&c.stopped) == 1 { return } log.Tracef("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes)) - for v := range c.Peers().Values() { + for v := range c.peers.Values() { peer := v.(*Peer) success := peer.TrySend(pkt) log.Tracef("Broadcast for peer %v success: %v", peer, success) - if !success { - // TODO: notify the peer + if success { + numSuccess += 1 + } else { + numFailure += 1 } } + return } @@ -128,13 +132,11 @@ func (c *Client) Receive(chName String) *InboundPacket { Panicf("Expected recvQueues[%f], found none", chName) } - for { - select { - case <-c.quit: - return nil - case inPacket := <-q: - return inPacket - } + select { + case <-c.quit: + return nil + case inPacket := <-q: + return inPacket } } diff --git a/peer/client_test.go b/peer/client_test.go index d7c87620..54c1e291 100644 --- a/peer/client_test.go +++ b/peer/client_test.go @@ -1,19 +1,20 @@ package peer import ( - . "github.com/tendermint/tendermint/binary" "testing" "time" + + . "github.com/tendermint/tendermint/binary" ) // convenience method for creating two clients connected to each other. -func makeClientPair(t *testing.T, bufferSize int, channels []string) (*Client, *Client) { +func makeClientPair(t testing.TB, bufferSize int, channels []String) (*Client, *Client) { peerMaker := func(conn *Connection) *Peer { p := NewPeer(conn) p.channels = map[String]*Channel{} - for chName := range channels { - p.channels[String(chName)] = NewChannel(String(chName), bufferSize) + for _, chName := range channels { + p.channels[chName] = NewChannel(chName, bufferSize) } return p } @@ -39,12 +40,18 @@ func makeClientPair(t *testing.T, bufferSize int, channels []string) (*Client, * // Wait for things to happen, peers to get added... time.Sleep(100 * time.Millisecond) + // Close the server, no longer needed. + s1.Stop() + return c1, c2 } func TestClients(t *testing.T) { - c1, c2 := makeClientPair(t, 10, []string{"ch1", "ch2", "ch3"}) + channels := []String{"ch1", "ch2", "ch3"} + c1, c2 := makeClientPair(t, 10, channels) + defer c1.Stop() + defer c2.Stop() // Lets send a message from c1 to c2. if c1.Peers().Size() != 1 { @@ -75,31 +82,51 @@ func TestClients(t *testing.T) { if string(inMsg.Bytes) != "channel one" { t.Errorf("Unexpected received message bytes: %v", string(inMsg.Bytes)) } - - s1.Stop() - c2.Stop() } func BenchmarkClients(b *testing.B) { b.StopTimer() - // TODO: benchmark the random functions, which is faster? - - c1, c2 := makeClientPair(t, 10, []string{"ch1", "ch2", "ch3"}) + channels := []String{"ch1", "ch2", "ch3"} + c1, c2 := makeClientPair(b, 10, channels) + defer c1.Stop() + defer c2.Stop() // Create a sink on either channel to just pop off messages. - // TODO: ensure that when clients stop, this goroutine stops. - recvHandler := func(c *Client) { + recvHandler := func(c *Client, chName String) { + for { + it := c.Receive(chName) + if it == nil { + break + } + } } - go recvHandler(c1) - go recvHandler(c2) + for _, chName := range channels { + go recvHandler(c1, chName) + go recvHandler(c2, chName) + } + // Allow time for goroutines to boot up + time.Sleep(1000 * time.Millisecond) b.StartTimer() + numSuccess, numFailure := 0, 0 + // Send random message from one channel to another for i := 0; i < b.N; i++ { + chName := channels[i%len(channels)] + pkt := NewPacket(chName, ByteSlice("test data")) + nS, nF := c1.Broadcast(pkt) + numSuccess += nS + numFailure += nF } + log.Warnf("success: %v, failure: %v", numSuccess, numFailure) + + // Allow everything to flush before stopping clients & closing connections. + b.StopTimer() + time.Sleep(1000 * time.Millisecond) + } diff --git a/peer/connection.go b/peer/connection.go index 16af6a64..3317057f 100644 --- a/peer/connection.go +++ b/peer/connection.go @@ -2,11 +2,12 @@ package peer import ( "fmt" - . "github.com/tendermint/tendermint/binary" - . "github.com/tendermint/tendermint/common" "net" "sync/atomic" "time" + + . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/common" ) const ( @@ -115,12 +116,14 @@ FOR_LOOP: break FOR_LOOP } + if atomic.LoadUint32(&c.stopped) == 1 { + break FOR_LOOP + } if err != nil { log.Infof("%v failed @ sendHandler:\n%v", c, err) c.Stop() break FOR_LOOP } - c.flush() } diff --git a/peer/listener.go b/peer/listener.go index 1db13702..0796479d 100644 --- a/peer/listener.go +++ b/peer/listener.go @@ -1,9 +1,10 @@ package peer import ( - . "github.com/tendermint/tendermint/common" "net" "sync/atomic" + + . "github.com/tendermint/tendermint/common" ) const ( @@ -51,7 +52,7 @@ func (l *DefaultListener) listenHandler() { conn, err := l.listener.Accept() if atomic.LoadUint32(&l.stopped) == 1 { - return + break // go to cleanup } // listener wasn't stopped, @@ -104,21 +105,29 @@ func GetLocalAddress() *NetAddress { // UPNP external address discovery & port mapping // TODO: more flexible internal & external ports func GetUPNPLocalAddress() *NetAddress { + // XXX remove nil, create option for specifying address. + // removed because this takes too long. + return nil + log.Infof("Getting UPNP local address") nat, err := Discover() if err != nil { + log.Infof("Could not get UPNP local address: %v", err) return nil } ext, err := nat.GetExternalAddress() if err != nil { + log.Infof("Could not get UPNP local address: %v", err) return nil } _, err = nat.AddPortMapping("tcp", DEFAULT_PORT, DEFAULT_PORT, "tendermint", 0) if err != nil { + log.Infof("Could not get UPNP local address: %v", err) return nil } + log.Infof("Got UPNP local address: %v", ext) return NewNetAddressIPPort(ext, DEFAULT_PORT) } diff --git a/peer/log.go b/peer/log.go index c9a3702e..97fad057 100644 --- a/peer/log.go +++ b/peer/log.go @@ -9,7 +9,7 @@ var log seelog.LoggerInterface func init() { // TODO: replace with configuration file in the ~/.tendermint directory. config := ` - + diff --git a/peer/peer.go b/peer/peer.go index beea28bf..a36d90f0 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -2,11 +2,12 @@ package peer import ( "fmt" - . "github.com/tendermint/tendermint/binary" "io" "sync" "sync/atomic" "time" + + . "github.com/tendermint/tendermint/binary" ) /* Peer */ diff --git a/peer/server.go b/peer/server.go index 9749ed2a..bd873396 100644 --- a/peer/server.go +++ b/peer/server.go @@ -1,7 +1,5 @@ package peer -import () - /* Server */ type Server struct { @@ -31,8 +29,8 @@ func (s *Server) IncomingConnectionHandler() { } } +// stops the server, not the client. func (s *Server) Stop() { log.Infof("Stopping server") s.listener.Stop() - s.client.Stop() }