server: asynchronously handle internal broadcast/send requests

This commit modifies the request handling within the sever’s
queryHandler goroutine to ensure that requests from the ChannelRouter
or other related sub-systems don’t block the main processing loop.

We do this simply by launching a goroutine to handle the dispatch of
the request.
This commit is contained in:
Olaoluwa Osuntokun 2017-01-24 17:06:23 -08:00
parent ddc3c3ab35
commit 73d5daa2c3
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
1 changed files with 41 additions and 29 deletions

View File

@ -578,49 +578,60 @@ out:
srvrLog.Debugf("Broadcasting %v messages", len(bMsg.msgs))
s.peersMtx.RLock()
for _, peer := range s.peersByPub {
if ignore != nil &&
peer.addr.IdentityKey.IsEqual(ignore) {
// Launch a new goroutine to handle the broadcast
// request, this allows us process this request
// asynchronously without blocking subsequent broadcast
// requests.
go func() {
s.peersMtx.RLock()
for _, peer := range s.peersByPub {
if ignore != nil &&
peer.addr.IdentityKey.IsEqual(ignore) {
srvrLog.Debugf("Skipping %v in broadcast",
ignore.SerializeCompressed())
srvrLog.Debugf("Skipping %v in broadcast",
ignore.SerializeCompressed())
continue
continue
}
for _, msg := range bMsg.msgs {
peer.queueMsg(msg, nil)
}
}
s.peersMtx.RUnlock()
for _, msg := range bMsg.msgs {
peer.queueMsg(msg, nil)
}
}
s.peersMtx.RUnlock()
bMsg.errChan <- nil
bMsg.errChan <- nil
}()
case sMsg := <-s.sendRequests:
// TODO(roasbeef): use [33]byte everywhere instead
// * eliminate usage of mutexes, funnel all peer
// mutation to this goroutine
// mutation to this goroutine
target := sMsg.target.SerializeCompressed()
srvrLog.Debugf("Attempting to send msgs %v to: %x",
len(sMsg.msgs), target)
s.peersMtx.RLock()
targetPeer, ok := s.peersByPub[string(target)]
if !ok {
// Launch a new goroutine to handle this send request,
// this allows us process this request asynchronously
// without blocking future send requests.
go func() {
s.peersMtx.RLock()
targetPeer, ok := s.peersByPub[string(target)]
if !ok {
s.peersMtx.RUnlock()
srvrLog.Errorf("unable to send message to %x, "+
"peer not found", target)
sMsg.errChan <- errors.New("peer not found")
return
}
for _, msg := range sMsg.msgs {
targetPeer.queueMsg(msg, nil)
}
s.peersMtx.RUnlock()
srvrLog.Errorf("unable to send message to %x, "+
"peer not found", target)
sMsg.errChan <- errors.New("peer not found")
continue
}
for _, msg := range sMsg.msgs {
targetPeer.queueMsg(msg, nil)
}
s.peersMtx.RUnlock()
sMsg.errChan <- nil
sMsg.errChan <- nil
}()
case query := <-s.queries:
switch msg := query.(type) {
case *connectPeerMsg:
@ -695,6 +706,7 @@ func (s *server) handleConnectPeer(msg *connectPeerMsg) {
Addr: addr,
Permanent: true,
})
msg.err <- nil
} else {
// If we're not making a persistent connection, then we'll
// attempt to connect o the target peer, returning an error