From ce2d5a21565ef8984c42da945994799f9c40ccfe Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sat, 10 Mar 2018 16:46:38 -0800 Subject: [PATCH] autopilot: limit the number of outstanding channel open goroutines In this commit, we fix an existing bug that would at times cause us to spiral out of control and potentially created thousands of outbound connections. Our fix is simple: limit the total number of outstanding channel establishment attempts. Without this limit, we have no way to bound the number of active goroutines. Fixes #772. --- autopilot/agent.go | 22 ++++++++++++++++++++++ autopilot/agent_test.go | 20 +++++++++++++------- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index af119f06..1f7f56d4 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -41,6 +41,12 @@ type Config struct { // within the graph. Graph ChannelGraph + // MaxPendingOpens is the maximum number of pending channel + // establishment goroutines that can be lingering. We cap this value in + // order to control the level of parallelism caused by the autopiloit + // agent. + MaxPendingOpens uint16 + // TODO(roasbeef): add additional signals from fee rates and revenue of // currently opened channels } @@ -410,6 +416,21 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { // top of our controller loop. pendingMtx.Lock() for _, chanCandidate := range chanCandidates { + // Before we proceed, we'll check to see if + // this attempt would take us past the total + // number of allowed pending opens. If so, then + // we'll skip this round and wait for an + // attempt to either fail or succeed. + if uint16(len(pendingOpens))+1 > + a.cfg.MaxPendingOpens { + + log.Debugf("Reached cap of %v pending "+ + "channel opens, will retry "+ + "after success/failure", + a.cfg.MaxPendingOpens) + continue + } + nID := NewNodeID(chanCandidate.PeerKey) pendingOpens[nID] = Channel{ Capacity: chanCandidate.ChanAmt, @@ -417,6 +438,7 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { } go func(directive AttachmentDirective) { + pub := directive.PeerKey err := a.cfg.ChanController.OpenChannel( diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index d8a3d57e..ebc15bf9 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -138,7 +138,8 @@ func TestAgentChannelOpenSignal(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return 0, nil }, - Graph: memGraph, + Graph: memGraph, + MaxPendingOpens: 10, } initialChans := []Channel{} agent, err := New(testCfg, initialChans) @@ -271,7 +272,8 @@ func TestAgentChannelFailureSignal(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return 0, nil }, - Graph: memGraph, + Graph: memGraph, + MaxPendingOpens: 10, } initialChans := []Channel{} @@ -360,7 +362,8 @@ func TestAgentChannelCloseSignal(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return 0, nil }, - Graph: memGraph, + Graph: memGraph, + MaxPendingOpens: 10, } // We'll start the agent with two channels already being active. @@ -483,7 +486,8 @@ func TestAgentBalanceUpdate(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return walletBalance, nil }, - Graph: memGraph, + Graph: memGraph, + MaxPendingOpens: 10, } initialChans := []Channel{} agent, err := New(testCfg, initialChans) @@ -530,7 +534,7 @@ func TestAgentBalanceUpdate(t *testing.T) { case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: // At this point, the local state of the agent should // have also been updated to reflect that the LN node - // now has an additional 5BTC available. + // now has an additional 5BTC available. const expectedAmt = walletBalance + balanceDelta if agent.totalBalance != expectedAmt { t.Fatalf("expected %v wallet balance "+ @@ -598,7 +602,8 @@ func TestAgentImmediateAttach(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return walletBalance, nil }, - Graph: memGraph, + Graph: memGraph, + MaxPendingOpens: 10, } initialChans := []Channel{} agent, err := New(testCfg, initialChans) @@ -732,7 +737,8 @@ func TestAgentPendingChannelState(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return walletBalance, nil }, - Graph: memGraph, + Graph: memGraph, + MaxPendingOpens: 10, } initialChans := []Channel{} agent, err := New(testCfg, initialChans)