This commit is contained in:
obscuren 2014-01-10 10:58:46 +01:00
parent ce40e51ae5
commit 19addcb51a
1 changed files with 16 additions and 0 deletions

16
peer.go
View File

@ -31,6 +31,7 @@ func ReadMessage(conn net.Conn) (*InMsg, error) {
// XXX The data specification is made up. This will change once more details have been released on the specification of the format
decoder := NewRlpDecoder(buff[:n])
t := decoder.Get(0).AsString()
// If the msgdata contains no data we throw an error and disconnect the peer
if t == "" {
return nil, errors.New("Data contained no data type")
}
@ -39,9 +40,13 @@ func ReadMessage(conn net.Conn) (*InMsg, error) {
}
type Peer struct {
// Server interface
server *Server
// Net connection
conn net.Conn
// Output queue which is used to communicate and handle messages
outputQueue chan OutMsg
// Quit channel
quit chan bool
}
@ -60,21 +65,28 @@ func (p *Peer) QueueMessage(msgType string, data []byte) {
p.outputQueue <- OutMsg{msgType: msgType, data: data}
}
// Outbound message handler. Outbound messages are handled here
func (p *Peer) HandleOutbound() {
out:
for {
select {
// Main message queue. All outbound messages are processed through here
case msg := <-p.outputQueue:
// TODO Message checking and handle accordingly
p.WriteMessage(msg)
// Break out of the for loop if a quit message is posted
case <- p.quit:
break out
}
}
}
// Write a message to the peer.
func (p *Peer) WriteMessage(msg OutMsg) {
// Encode the type and the (RLP encoded) data for sending over the wire
encoded := Encode([]interface{}{ msg.msgType, msg.data })
// Write to the connection
_, err := p.conn.Write(encoded)
if err != nil {
log.Println(err)
@ -82,11 +94,13 @@ func (p *Peer) WriteMessage(msg OutMsg) {
}
}
// Inbound handler. Inbound messages are received here and passed to the appropriate methods
func (p *Peer) HandleInbound() {
defer p.Stop()
out:
for {
// Wait for a message from the peer
msg, err := ReadMessage(p.conn)
if err != nil {
log.Println(err)
@ -104,7 +118,9 @@ out:
}
func (p *Peer) Start() {
// Run the outbound handler in a new goroutine
go p.HandleOutbound()
// Run the inbound handler in a new goroutine
go p.HandleInbound()
}