diff --git a/peer.go b/peer.go index e3a4f74cb..e6f752022 100644 --- a/peer.go +++ b/peer.go @@ -9,6 +9,11 @@ import ( "time" ) +const ( + // The size of the output buffer for writing messages + outputBufferSize = 50 +) + type Peer struct { // Server interface server *Server @@ -24,11 +29,12 @@ type Peer struct { connected int32 disconnect int32 lastSend time.Time + versionKnown bool } func NewPeer(conn net.Conn, server *Server, inbound bool) *Peer { return &Peer{ - outputQueue: make(chan *ethwire.InOutMsg, 1), // Buffered chan of 1 is enough + outputQueue: make(chan *ethwire.InOutMsg, outputBufferSize), quit: make(chan bool), server: server, conn: conn, @@ -40,7 +46,7 @@ func NewPeer(conn net.Conn, server *Server, inbound bool) *Peer { func NewOutboundPeer(addr string, server *Server) *Peer { p := &Peer{ - outputQueue: make(chan *ethwire.InOutMsg, 1), // Buffered chan of 1 is enough + outputQueue: make(chan *ethwire.InOutMsg, outputBufferSize), quit: make(chan bool), server: server, inbound: false, @@ -61,6 +67,8 @@ func NewOutboundPeer(addr string, server *Server) *Peer { atomic.StoreInt32(&p.disconnect, 0) log.Println("Connected to peer ::", conn.RemoteAddr()) + + p.Start() }() return p @@ -77,6 +85,14 @@ func (p *Peer) writeMessage(msg *ethwire.InOutMsg) { return } + if !p.versionKnown { + switch msg.MsgType { + case "verack": // Ok + default: // Anything but ack is allowed + return + } + } + err := ethwire.WriteMessage(p.conn, msg) if err != nil { log.Println("Can't send message:", err) @@ -191,10 +207,11 @@ func (p *Peer) handleVersionAck(msg *ethwire.InOutMsg) { log.Println("Peer connected to self, disconnecting") p.Stop() + return } - log.Println("mnonce", msg.Nonce, "snonce", p.server.Nonce) + p.versionKnown = true // If this is an inbound connection send an ack back if p.inbound { diff --git a/server.go b/server.go index b01ecb08c..bc398dd92 100644 --- a/server.go +++ b/server.go @@ -71,9 +71,6 @@ func (s *Server) ConnectToPeer(addr string) error { s.peers.PushBack(peer) - peer.Start() - - return nil } @@ -106,6 +103,8 @@ func (s *Server) Start() { // TMP go func() { + //time.Sleep(500 * time.Millisecond) + for { s.Broadcast("block", s.blockManager.bc.GenesisBlock().MarshalRlp())