package p2p import ( "fmt" "sync" "time" ) const ( handlerTimeout = 1000 ) type Handlers map[string](func(p *Peer) Protocol) type Messenger struct { conn *Connection peer *Peer handlers Handlers protocolLock sync.RWMutex protocols []Protocol offsets []MsgCode // offsets for adaptive message idss protocolTable map[string]int quit chan chan bool err chan *PeerError pulse chan bool } func NewMessenger(peer *Peer, conn *Connection, errchan chan *PeerError, handlers Handlers) *Messenger { baseProtocol := NewBaseProtocol(peer) return &Messenger{ conn: conn, peer: peer, offsets: []MsgCode{baseProtocol.Offset()}, handlers: handlers, protocols: []Protocol{baseProtocol}, protocolTable: make(map[string]int), err: errchan, pulse: make(chan bool, 1), quit: make(chan chan bool, 1), } } func (self *Messenger) Start() { self.conn.Open() go self.messenger() self.protocolLock.RLock() defer self.protocolLock.RUnlock() self.protocols[0].Start() } func (self *Messenger) Stop() { // close pulse to stop ping pong monitoring close(self.pulse) self.protocolLock.RLock() defer self.protocolLock.RUnlock() for _, protocol := range self.protocols { protocol.Stop() // could be parallel } q := make(chan bool) self.quit <- q <-q self.conn.Close() } func (self *Messenger) messenger() { in := self.conn.Read() for { select { case payload, ok := <-in: //dispatches message to the protocol asynchronously if ok { go self.handle(payload) } else { return } case q := <-self.quit: q <- true return } } } // handles each message by dispatching to the appropriate protocol // using adaptive message codes // this function is started as a separate go routine for each message // it waits for the protocol response // then encodes and sends outgoing messages to the connection's write channel func (self *Messenger) handle(payload []byte) { // send ping to heartbeat channel signalling time of last message // select { // case self.pulse <- true: // default: // } self.pulse <- true // initialise message from payload msg, err := NewMsgFromBytes(payload) if err != nil { self.err <- NewPeerError(MiscError, " %v", err) return } // retrieves protocol based on message Code protocol, offset, peerErr := self.getProtocol(msg.Code()) if err != nil { self.err <- peerErr return } // reset message code based on adaptive offset msg.Decode(offset) // dispatches response := make(chan *Msg) go protocol.HandleIn(msg, response) // protocol reponse timeout to prevent leaks timer := time.After(handlerTimeout * time.Millisecond) for { select { case outgoing, ok := <-response: // we check if response channel is not closed if ok { self.conn.Write() <- outgoing.Encode(offset) } else { return } case <-timer: return } } } // negotiated protocols // stores offsets needed for adaptive message id scheme // based on offsets set at handshake // get the right protocol to handle the message func (self *Messenger) getProtocol(code MsgCode) (Protocol, MsgCode, *PeerError) { self.protocolLock.RLock() defer self.protocolLock.RUnlock() base := MsgCode(0) for index, offset := range self.offsets { if code < offset { return self.protocols[index], base, nil } base = offset } return nil, MsgCode(0), NewPeerError(InvalidMsgCode, " %v", code) } func (self *Messenger) PingPong(timeout time.Duration, gracePeriod time.Duration, pingCallback func(), timeoutCallback func()) { fmt.Printf("pingpong keepalive started at %v", time.Now()) timer := time.After(timeout) pinged := false for { select { case _, ok := <-self.pulse: if ok { pinged = false timer = time.After(timeout) } else { // pulse is closed, stop monitoring return } case <-timer: if pinged { fmt.Printf("timeout at %v", time.Now()) timeoutCallback() return } else { fmt.Printf("pinged at %v", time.Now()) pingCallback() timer = time.After(gracePeriod) pinged = true } } } } func (self *Messenger) AddProtocols(protocols []string) { self.protocolLock.Lock() defer self.protocolLock.Unlock() i := len(self.offsets) offset := self.offsets[i-1] for _, name := range protocols { protocolFunc, ok := self.handlers[name] if ok { protocol := protocolFunc(self.peer) self.protocolTable[name] = i i++ offset += protocol.Offset() fmt.Println("offset ", name, offset) self.offsets = append(self.offsets, offset) self.protocols = append(self.protocols, protocol) protocol.Start() } else { fmt.Println("no ", name) // protocol not handled } } } func (self *Messenger) Write(protocol string, msg *Msg) error { self.protocolLock.RLock() defer self.protocolLock.RUnlock() i := 0 offset := MsgCode(0) if len(protocol) > 0 { var ok bool i, ok = self.protocolTable[protocol] if !ok { return fmt.Errorf("protocol %v not handled by peer", protocol) } offset = self.offsets[i-1] } handler := self.protocols[i] // checking if protocol status/caps allows the message to be sent out if handler.HandleOut(msg) { self.conn.Write() <- msg.Encode(offset) } return nil }