diff --git a/autopilot/agent.go b/autopilot/agent.go index af119f06..1f7f56d4 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -41,6 +41,12 @@ type Config struct { // within the graph. Graph ChannelGraph + // MaxPendingOpens is the maximum number of pending channel + // establishment goroutines that can be lingering. We cap this value in + // order to control the level of parallelism caused by the autopiloit + // agent. + MaxPendingOpens uint16 + // TODO(roasbeef): add additional signals from fee rates and revenue of // currently opened channels } @@ -410,6 +416,21 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { // top of our controller loop. pendingMtx.Lock() for _, chanCandidate := range chanCandidates { + // Before we proceed, we'll check to see if + // this attempt would take us past the total + // number of allowed pending opens. If so, then + // we'll skip this round and wait for an + // attempt to either fail or succeed. + if uint16(len(pendingOpens))+1 > + a.cfg.MaxPendingOpens { + + log.Debugf("Reached cap of %v pending "+ + "channel opens, will retry "+ + "after success/failure", + a.cfg.MaxPendingOpens) + continue + } + nID := NewNodeID(chanCandidate.PeerKey) pendingOpens[nID] = Channel{ Capacity: chanCandidate.ChanAmt, @@ -417,6 +438,7 @@ func (a *Agent) controller(startingBalance btcutil.Amount) { } go func(directive AttachmentDirective) { + pub := directive.PeerKey err := a.cfg.ChanController.OpenChannel( diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index d8a3d57e..ebc15bf9 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -138,7 +138,8 @@ func TestAgentChannelOpenSignal(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return 0, nil }, - Graph: memGraph, + Graph: memGraph, + MaxPendingOpens: 10, } initialChans := []Channel{} agent, err := New(testCfg, initialChans) @@ -271,7 +272,8 @@ func TestAgentChannelFailureSignal(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return 0, nil }, - Graph: memGraph, + Graph: memGraph, + MaxPendingOpens: 10, } initialChans := []Channel{} @@ -360,7 +362,8 @@ func TestAgentChannelCloseSignal(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return 0, nil }, - Graph: memGraph, + Graph: memGraph, + MaxPendingOpens: 10, } // We'll start the agent with two channels already being active. @@ -483,7 +486,8 @@ func TestAgentBalanceUpdate(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return walletBalance, nil }, - Graph: memGraph, + Graph: memGraph, + MaxPendingOpens: 10, } initialChans := []Channel{} agent, err := New(testCfg, initialChans) @@ -530,7 +534,7 @@ func TestAgentBalanceUpdate(t *testing.T) { case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: // At this point, the local state of the agent should // have also been updated to reflect that the LN node - // now has an additional 5BTC available. + // now has an additional 5BTC available. const expectedAmt = walletBalance + balanceDelta if agent.totalBalance != expectedAmt { t.Fatalf("expected %v wallet balance "+ @@ -598,7 +602,8 @@ func TestAgentImmediateAttach(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return walletBalance, nil }, - Graph: memGraph, + Graph: memGraph, + MaxPendingOpens: 10, } initialChans := []Channel{} agent, err := New(testCfg, initialChans) @@ -732,7 +737,8 @@ func TestAgentPendingChannelState(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return walletBalance, nil }, - Graph: memGraph, + Graph: memGraph, + MaxPendingOpens: 10, } initialChans := []Channel{} agent, err := New(testCfg, initialChans)