autopilot: "Look ma no hands!", introducing autopilot mode

This commit introduces the initial implementation of the autopilot
mode. Autopilot is new mode within lnd that enables automatic channel
management. This means that if enabled lnd will attempt to
automatically manage channels according to a set of heuristic defined
within the main configuration for autopilot.Agent instance.

The autopilot.Agent implements a simple closed control loop. It takes
in external signals such as wallet balance updates, new open channel,
and channels that are now closed the updates its internal state. With
each external trigger it will consult the registered
AttachmentHeuristic to decide: if it needs to open any more channels,
and if so how much it should use to open the channels, ultimately
returning a set of recommended AttachmentDirectives. The
autopilot.Agent loop will then take those attempt to establish
connection, and go back in waiting for a new external signal.

With this first implementation the default heuristic is the
ConstrainedPrefAttachment implementation of AttachmentHeuristic. Given
a min and max channel size, a limit on the number of channels, and the
percentage of wallet funds to allocate to channels, it will attempt to
execute a heuristic drive by the Barabási–Albert model model in order
to attempt to drive the global graph towards a scale free topology.

This is commit implements a foundational layer for future simulations,
optimization, and additional heuristics.
This commit is contained in:
Olaoluwa Osuntokun 2017-08-10 21:14:41 -07:00
parent a2545d85dc
commit 306c4aef8e
No known key found for this signature in database
GPG Key ID: 3D0A94DB79743DF5
7 changed files with 2415 additions and 0 deletions

353
autopilot/agent.go Normal file
View File

@ -0,0 +1,353 @@
package autopilot
import (
"sync"
"sync/atomic"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcutil"
)
// Config couples all the items that that an autopilot agent needs to function.
// All items within the struct MUST be populated for the Agent to be able to
// carry out its duties.
type Config struct {
// Self is the identity public key of the Lightning Network node that
// is being driven by the agent. This is used to ensure that we don't
// accidentally attempt to open a channel with ourselves.
Self *btcec.PublicKey
// Heuristic is an attachment heuristic which will govern to whom we
// open channels to, and also what those channels look like in terms of
// desired capacity. The Heuristic will take into account the current
// state of the graph, our set of open channels, and the amount of
// available funds when determining how channels are to be opened.
// Additionally, a heuristic make also factor in extra-graph
// information in order to make more pertinent recommendations.
Heuristic AttachmentHeuristic
// ChanController is an interface that is able to directly manage the
// creation, closing and update of channels within the network.
ChanController ChannelController
// WalletBalance is a function closure that should return the current
// available balance o the backing wallet.
WalletBalance func() (btcutil.Amount, error)
// Graph is an abstract channel graph that the Heuristic and the Agent
// will use to make decisions w.r.t channel allocation and placement
// within the graph.
Graph ChannelGraph
// TODO(roasbeef): add additional signals from fee rates and revenue of
// currently opened channels
}
// channelState is a type that represents the set of active channels of the
// backing LN node that the Agent should be ware of. This type contains a few
// helper utility methods.
type channelState map[lnwire.ShortChannelID]Channel
// CHannels returns a slice of all the active channels.
func (c channelState) Channels() []Channel {
chans := make([]Channel, 0, len(c))
for _, channel := range c {
chans = append(chans, channel)
}
return chans
}
// ConnectedNodes returns the set of nodes we currently have a channel with.
// This information is needed as we want to avoid making repeated channels with
// any node.
func (c channelState) ConnectedNodes() map[NodeID]struct{} {
nodes := make(map[NodeID]struct{})
for _, channels := range c {
nodes[channels.Node] = struct{}{}
}
return nodes
}
// Agent implements a closed-loop control system which seeks to autonomously
// optimize the allocation of satoshis within channels throughput the network's
// channel graph. An agent is configurable by swapping out different
// AttachmentHeuristic strategies. The agent uses external signals such as the
// wallet balance changing, or new channels being opened/closed for the local
// node as an indicator to re-examine its internal state, and the amount of
// available funds in order to make updated decisions w.r.t the channel graph.
// The Agent will automatically open, close, and splice in/out channel as
// necessary for it to step closer to its optimal state.
//
// TODO(roasbeef): prob re-word
type Agent struct {
// Only to be used atomically.
started uint32
stopped uint32
// cfg houses the configuration state of the Ant.
cfg Config
// chanState tracks the current set of open channels.
chanState channelState
// stateUpdates is a channel that any external state updates that may
// affect the heuristics of the agent will be sent over.
stateUpdates chan interface{}
// totalBalance is the total number of satoshis the backing wallet is
// known to control at any given instance. This value will be updated
// when the agent receives external balance update signals.
totalBalance btcutil.Amount
quit chan struct{}
wg sync.WaitGroup
}
// New creates a new instance of the Agent instantiated using the passed
// configuration and initial channel state. The initial channel state slice
// should be populated with the set of Channels that are currently opened by
// the backing Lightning Node.
func New(cfg Config, initialState []Channel) (*Agent, error) {
a := &Agent{
cfg: cfg,
chanState: make(map[lnwire.ShortChannelID]Channel),
quit: make(chan struct{}),
stateUpdates: make(chan interface{}),
}
for _, c := range initialState {
a.chanState[c.ChanID] = c
}
return a, nil
}
// Start starts the agent along with any goroutines it needs to perform its
// normal duties.
func (a *Agent) Start() error {
if !atomic.CompareAndSwapUint32(&a.started, 0, 1) {
return nil
}
log.Infof("Autopilot Agent starting")
startingBalance, err := a.cfg.WalletBalance()
if err != nil {
return err
}
a.wg.Add(1)
go a.controller(startingBalance)
return nil
}
// Stop signals the Agent to gracefully shutdown. This function will block
// until all goroutines have exited.
func (a *Agent) Stop() error {
if !atomic.CompareAndSwapUint32(&a.stopped, 0, 1) {
return nil
}
log.Infof("Autopilot Agent stopping")
close(a.quit)
a.wg.Wait()
return nil
}
// balanceUpdate is a type of external state update that reflects an
// increase/decrease in the funds currently available to the wallet.
type balanceUpdate struct {
balanceDelta btcutil.Amount
}
// chanOpenUpdate is a type of external state update the indicates a new
// channel has been opened, either by the Agent itself (within the main
// controller loop), or by an external user to the system.
type chanOpenUpdate struct {
newChan Channel
}
// chanCloseUpdate is a type of external state update that indicates that the
// backing Lightning Node has closed a previously open channel.
type chanCloseUpdate struct {
closedChans []lnwire.ShortChannelID
}
// OnBalanceChange is a callback that should be executed each the balance of
// the backing wallet changes.
func (a *Agent) OnBalanceChange(delta btcutil.Amount) {
go func() {
a.stateUpdates <- &balanceUpdate{
balanceDelta: delta,
}
}()
}
// OnChannelOpen is a callback that should be executed each time a new channel
// is manually opened by the user or any system outside the autopilot agent.
func (a *Agent) OnChannelOpen(c Channel) {
go func() {
a.stateUpdates <- &chanOpenUpdate{
newChan: c,
}
}()
}
// OnChannelClose is a callback that should be executed each time a prior
// channel has been closed for any reason. This includes regular
// closes, force closes, and channel breaches.
func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) {
go func() {
a.stateUpdates <- &chanCloseUpdate{
closedChans: closedChans,
}
}()
}
// controller implements the closed-loop control system of the Agent. The
// controller will make a decision w.r.t channel placement within the graph
// based on: it's current internal state of the set of active channels open,
// and external state changes as a result of decisions it makes w.r.t channel
// allocation, or attributes affecting its control loop being updated by the
// backing Lightning Node.
func (a *Agent) controller(startingBalance btcutil.Amount) {
defer a.wg.Done()
// TODO(roasbeef): add queries for internal state?
// We'll start off by assigning our starting balance, and injecting
// that amount as an initial wake up to the main controller goroutine.
a.OnBalanceChange(startingBalance)
// TODO(roasbeef): do we in fact need to maintain order?
// * use sync.Cond if so
// TODO(roasbeef): add 10-minute wake up timer
for {
select {
// A new external signal has arrived. We'll use this to update
// our internal state, then determine if we should trigger a
// channel state modification (open/close, splice in/out).
case signal := <-a.stateUpdates:
log.Infof("Processing new external signal")
switch update := signal.(type) {
// The balance of the backing wallet has changed, if
// more funds are now available, we may attempt to open
// up an additional channel, or splice in funds to an
// existing one.
case *balanceUpdate:
log.Debugf("Applying external balance state "+
"update of: %v", update.balanceDelta)
a.totalBalance += update.balanceDelta
// A new channel has been opened successfully. This was
// either opened by the Agent, or an external system
// that is able to drive the Lightning Node.
case *chanOpenUpdate:
log.Debugf("New channel successfully opened, "+
"updating state with: %v",
spew.Sdump(update.newChan))
newChan := update.newChan
a.chanState[newChan.ChanID] = newChan
// A channel has been closed, this may free up an
// available slot, triggering a new channel update.
case *chanCloseUpdate:
log.Debugf("Applying closed channel "+
"updates: %v",
spew.Sdump(update.closedChans))
for _, closedChan := range update.closedChans {
delete(a.chanState, closedChan)
}
}
// With all the updates applied, we'll obtain a set of
// the current active channels.
chans := a.chanState.Channels()
// Now that we've updated our internal state, we'll
// consult our channel attachment heuristic to
// determine if we should open up any additional
// channels or modify existing channels.
availableFunds, needMore := a.cfg.Heuristic.NeedMoreChans(
chans, a.totalBalance,
)
if !needMore {
continue
}
log.Infof("Triggering attachment directive dispatch")
// We're to attempt an attachment so we'll o obtain the
// set of nodes that we currently have channels with so
// we avoid duplicate edges.
nodesToSkip := a.chanState.ConnectedNodes()
// If we reach this point, then according to our
// heuristic we should modify our channel state to tend
// towards what it determines to the optimal state. So
// we'll call Select to get a fresh batch of attachment
// directives, passing in the amount of funds available
// for us to use.
chanCandidates, err := a.cfg.Heuristic.Select(
a.cfg.Self, a.cfg.Graph, availableFunds,
nodesToSkip,
)
if err != nil {
log.Errorf("Unable to select candidates for "+
"attachment: %v", err)
continue
}
if len(chanCandidates) == 0 {
log.Infof("No eligible candidates to connect to")
continue
}
log.Infof("Attempting to execute channel attachment "+
"directives: %v", spew.Sdump(chanCandidates))
// For each recommended attachment directive, we'll
// launch a new goroutine to attempt to carry out the
// directive. If any of these succeed, then we'll
// receive a new state update, taking us back to the
// top of our controller loop.
for _, chanCandidate := range chanCandidates {
go func(directive AttachmentDirective) {
pub := directive.PeerKey
err := a.cfg.ChanController.OpenChannel(
directive.PeerKey,
directive.ChanAmt,
directive.Addrs,
)
if err != nil {
log.Warnf("Unable to open "+
"channel to %x of %v: %v",
pub.SerializeCompressed(),
directive.ChanAmt, err)
return
}
// TODO(roasbeef): on err signal
// failure so attempt to allocate
// again?
}(chanCandidate)
}
// The agent has been signalled to exit, so we'll bail out
// immediately.
case <-a.quit:
return
}
}
}

554
autopilot/agent_test.go Normal file
View File

@ -0,0 +1,554 @@
package autopilot
import (
"bytes"
"net"
"sync"
"testing"
"time"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
)
type moreChansResp struct {
needMore bool
amt btcutil.Amount
}
type mockHeuristic struct {
moreChansResps chan moreChansResp
directiveResps chan []AttachmentDirective
}
func (m *mockHeuristic) NeedMoreChans(chans []Channel,
balance btcutil.Amount) (btcutil.Amount, bool) {
resp := <-m.moreChansResps
return resp.amt, resp.needMore
}
func (m *mockHeuristic) Select(self *btcec.PublicKey, graph ChannelGraph,
amtToUse btcutil.Amount, skipChans map[NodeID]struct{}) ([]AttachmentDirective, error) {
resp := <-m.directiveResps
return resp, nil
}
var _ AttachmentHeuristic = (*mockHeuristic)(nil)
type openChanIntent struct {
target *btcec.PublicKey
amt btcutil.Amount
addrs []net.Addr
}
type mockChanController struct {
openChanSignals chan openChanIntent
}
func (m *mockChanController) OpenChannel(target *btcec.PublicKey, amt btcutil.Amount,
addrs []net.Addr) error {
m.openChanSignals <- openChanIntent{
target: target,
amt: amt,
addrs: addrs,
}
return nil
}
func (m *mockChanController) CloseChannel(chanPoint *wire.OutPoint) error {
return nil
}
func (m *mockChanController) SpliceIn(chanPoint *wire.OutPoint,
amt btcutil.Amount) (*Channel, error) {
return nil, nil
}
func (m *mockChanController) SpliceOut(chanPoint *wire.OutPoint,
amt btcutil.Amount) (*Channel, error) {
return nil, nil
}
var _ ChannelController = (*mockChanController)(nil)
// TestAgentChannelOpenSignal tests that upon receipt of a chanOpenUpdate, then
// agent modifies its local state accordingly, and reconsults the heuristic.
func TestAgentChannelOpenSignal(t *testing.T) {
t.Parallel()
// First, we'll create all the dependencies that we'll need in order to
// create the autopilot agent.
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent, 10),
}
memGraph, _, _ := newMemChanGraph()
// With the dependencies we created, we can now create the initial
// agent itself.
testCfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return 0, nil
},
Graph: memGraph,
}
initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// With the autopilot agent and all its dependencies we'll star the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
var wg sync.WaitGroup
// We'll send an initial "no" response to advance the agent past its
// initial check.
wg.Add(1)
go func() {
select {
case heuristic.moreChansResps <- moreChansResp{false, 0}:
wg.Done()
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
wg.Wait()
// Next we'll signal a new channel being opened by the backing LN node,
// with a capacity of 1 BTC.
newChan := Channel{
ChanID: randChanID(),
Capacity: btcutil.SatoshiPerBitcoin,
}
agent.OnChannelOpen(newChan)
wg = sync.WaitGroup{}
// The agent should now query the heuristic in order to determine its
// next action as it local state has now been modified.
wg.Add(1)
go func() {
select {
case heuristic.moreChansResps <- moreChansResp{false, 0}:
// At this point, the local state of the agent should
// have also been updated to reflect that the LN node
// now has an additional channel with one BTC.
if _, ok := agent.chanState[newChan.ChanID]; !ok {
t.Fatalf("internal channel state wasn't updated")
}
// With all of our assertions passed, we'll signal the
// main test goroutine to continue the test.
wg.Done()
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
// We'll wait here for either the agent to query the heuristic to be
// queried, or for the timeout above to tick.
wg.Wait()
// There shouldn't be a call to the Select method as we've returned
// "false" for NeedMoreChans above.
select {
// If this send success, then Select was erroneously called and the
// test should be failed.
case heuristic.directiveResps <- []AttachmentDirective{}:
t.Fatalf("Select was called but shouldn't have been")
// This is the correct path as Select should've be called.
default:
}
}
// TestAgentChannelCloseSignal ensures that once the agent receives an outside
// signal of a channel belonging to the backing LN node being closed, then it
// will query the heuristic to make its next decision.
func TestAgentChannelCloseSignal(t *testing.T) {
t.Parallel()
// First, we'll create all the dependencies that we'll need in order to
// create the autopilot agent.
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
}
memGraph, _, _ := newMemChanGraph()
// With the dependencies we created, we can now create the initial
// agent itself.
testCfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return 0, nil
},
Graph: memGraph,
}
// We'll start the agent with two channels already being active.
initialChans := []Channel{
{
ChanID: randChanID(),
Capacity: btcutil.SatoshiPerBitcoin,
},
{
ChanID: randChanID(),
Capacity: btcutil.SatoshiPerBitcoin * 2,
},
}
agent, err := New(testCfg, initialChans)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// With the autopilot agent and all its dependencies we'll star the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
var wg sync.WaitGroup
// We'll send an initial "no" response to advance the agent past its
// initial check.
wg.Add(1)
go func() {
select {
case heuristic.moreChansResps <- moreChansResp{false, 0}:
wg.Done()
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
wg.Wait()
// Next, we'll close both channels which should force the agent to
// re-query the heuristic.
agent.OnChannelClose(initialChans[0].ChanID, initialChans[1].ChanID)
wg = sync.WaitGroup{}
// The agent should now query the heuristic in order to determine its
// next action as it local state has now been modified.
wg.Add(1)
go func() {
select {
case heuristic.moreChansResps <- moreChansResp{false, 0}:
// At this point, the local state of the agent should
// have also been updated to reflect that the LN node
// has no existing open channels.
if len(agent.chanState) != 0 {
t.Fatalf("internal channel state wasn't updated")
}
// With all of our assertions passed, we'll signal the
// main test goroutine to continue the test.
wg.Done()
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
// We'll wait here for either the agent to query the heuristic to be
// queried, or for the timeout above to tick.
wg.Wait()
// There shouldn't be a call to the Select method as we've returned
// "false" for NeedMoreChans above.
select {
// If this send success, then Select was erroneously called and the
// test should be failed.
case heuristic.directiveResps <- []AttachmentDirective{}:
t.Fatalf("Select was called but shouldn't have been")
// This is the correct path as Select should've be called.
default:
}
}
// TestAgentBalanceUpdateIncrease ensures that once the agent receives an
// outside signal concerning a balance update, then it will re-query the
// heuristic to determine its next action.
func TestAgentBalanceUpdate(t *testing.T) {
t.Parallel()
// First, we'll create all the dependencies that we'll need in order to
// create the autopilot agent.
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
}
memGraph, _, _ := newMemChanGraph()
// The wallet will start with 2 BTC available.
const walletBalance = btcutil.SatoshiPerBitcoin * 2
// With the dependencies we created, we can now create the initial
// agent itself.
testCfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil
},
Graph: memGraph,
}
initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// With the autopilot agent and all its dependencies we'll star the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
var wg sync.WaitGroup
// We'll send an initial "no" response to advance the agent past its
// initial check.
wg.Add(1)
go func() {
select {
case heuristic.moreChansResps <- moreChansResp{false, 0}:
wg.Done()
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
wg.Wait()
// Next we'll send a new balance update signal to the agent, adding 5
// BTC to the amount of available funds.
const balanceDelta = btcutil.SatoshiPerBitcoin * 5
agent.OnBalanceChange(balanceDelta)
wg = sync.WaitGroup{}
// The agent should now query the heuristic in order to determine its
// next action as it local state has now been modified.
wg.Add(1)
go func() {
select {
case heuristic.moreChansResps <- moreChansResp{false, 0}:
// At this point, the local state of the agent should
// have also been updated to reflect that the LN node
// now has an additional 5BTC available.
const expectedAmt = walletBalance + balanceDelta
if agent.totalBalance != expectedAmt {
t.Fatalf("expected %v wallet balance "+
"instead have %v", agent.totalBalance,
expectedAmt)
}
// With all of our assertions passed, we'll signal the
// main test goroutine to continue the test.
wg.Done()
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
// We'll wait here for either the agent to query the heuristic to be
// queried, or for the timeout above to tick.
wg.Wait()
// There shouldn't be a call to the Select method as we've returned
// "false" for NeedMoreChans above.
select {
// If this send success, then Select was erroneously called and the
// test should be failed.
case heuristic.directiveResps <- []AttachmentDirective{}:
t.Fatalf("Select was called but shouldn't have been")
// This is the correct path as Select should've be called.
default:
}
}
// TestAgentImmediateAttach tests that if an autopilot agent is created, and it
// has enough funds available to create channels, then it does so immediately.
func TestAgentImmediateAttach(t *testing.T) {
t.Parallel()
// First, we'll create all the dependencies that we'll need in order to
// create the autopilot agent.
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
}
memGraph, _, _ := newMemChanGraph()
// The wallet will start with 10 BTC available.
const walletBalance = btcutil.SatoshiPerBitcoin * 10
// With the dependencies we created, we can now create the initial
// agent itself.
testCfg := Config{
Self: self,
Heuristic: heuristic,
ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil
},
Graph: memGraph,
}
initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
if err != nil {
t.Fatalf("unable to create agent: %v", err)
}
// With the autopilot agent and all its dependencies we'll star the
// primary controller goroutine.
if err := agent.Start(); err != nil {
t.Fatalf("unable to start agent: %v", err)
}
defer agent.Stop()
var wg sync.WaitGroup
// The very first thing the agent should do is query the NeedMoreChans
// method on the passed heuristic. So we'll provide it with a response
// that will kick off the main loop.
wg.Add(1)
go func() {
select {
// We'll send over a response indicating that it should
// establish more channels, and give it a budget of 5 BTC to do
// so.
case heuristic.moreChansResps <- moreChansResp{true, 5 * btcutil.SatoshiPerBitcoin}:
wg.Done()
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
// We'll wait here for the agent to query the heuristic. If ti doesn't
// do so within 10 seconds, then the test will fail out.
wg.Wait()
// At this point, the agent should now be querying the heuristic to
// requests attachment directives. We'll generate 5 mock directives so
// it can progress within its loop.
const numChans = 5
directives := make([]AttachmentDirective, numChans)
for i := 0; i < numChans; i++ {
directives[i] = AttachmentDirective{
PeerKey: self,
ChanAmt: btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
&net.TCPAddr{
IP: bytes.Repeat([]byte("a"), 16),
},
},
}
}
wg = sync.WaitGroup{}
// With our fake directives created, we'll now send then to the agent
// as a return value for the Select function.
wg.Add(1)
go func() {
select {
case heuristic.directiveResps <- directives:
wg.Done()
return
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
}()
// We'll wait here for either the agent to query the heuristic to be
// queried, or for the timeout above to tick.
wg.Wait()
// Finally, we should receive 5 calls to the OpenChannel method with
// the exact same parameters that we specified within the attachment
// directives.
for i := 0; i < numChans; i++ {
select {
case openChan := <-chanController.openChanSignals:
if openChan.amt != btcutil.SatoshiPerBitcoin {
t.Fatalf("invalid chan amt: expected %v, got %v",
btcutil.SatoshiPerBitcoin, openChan.amt)
}
if !openChan.target.IsEqual(self) {
t.Fatalf("unexpected key: expected %x, got %x",
self.SerializeCompressed(),
openChan.target.SerializeCompressed())
}
if len(openChan.addrs) != 1 {
t.Fatalf("should have single addr, instead have: %v",
len(openChan.addrs))
}
case <-time.After(time.Second * 10):
t.Fatalf("channel not opened in time")
}
}
}

425
autopilot/graph.go Normal file
View File

@ -0,0 +1,425 @@
package autopilot
import (
"bytes"
"math/big"
prand "math/rand"
"net"
"time"
"github.com/boltdb/bolt"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcutil"
)
var (
testSig = &btcec.Signature{
R: new(big.Int),
S: new(big.Int),
}
_, _ = testSig.R.SetString("63724406601629180062774974542967536251589935445068131219452686511677818569431", 10)
_, _ = testSig.S.SetString("18801056069249825825291287104931333862866033135609736119018462340006816851118", 10)
)
// databaseChannelGraph wraps a channeldb.ChannelGraph instance with the
// necessary API to properly implement the autopilot.ChannelGraph interface.
//
// TODO(roasbeef): move inmpl to main package?
type databaseChannelGraph struct {
db *channeldb.ChannelGraph
}
// A compile time assertion to ensure databaseChannelGraph meets the
// autopilot.ChannelGraph interface.
var _ ChannelGraph = (*databaseChannelGraph)(nil)
// ChannelGraphFromDatabase returns a instance of the autopilot.ChannelGraph
// backed by a live, open channeldb instance.
func ChannelGraphFromDatabase(db *channeldb.ChannelGraph) ChannelGraph {
return &databaseChannelGraph{
db: db,
}
}
// type dbNode is a wrapper struct around a database transaction an
// channeldb.LightningNode. The wrapper method implement the autopilot.Node
// interface.
type dbNode struct {
tx *bolt.Tx
node *channeldb.LightningNode
}
// A compile time assertion to ensure dbNode meets the autopilot.Node
// interface.
var _ Node = (*dbNode)(nil)
// PubKey is the identity public key of the node. This will be used to attempt
// to target a node for channel opening by the main autopilot agent.
//
// NOTE: Part of the autopilot.Node interface.
func (d dbNode) PubKey() *btcec.PublicKey {
return d.node.PubKey
}
// Addrs returns a slice of publicly reachable public TCP addresses that the
// peer is known to be listening on.
//
// NOTE: Part of the autopilot.Node interface.
func (d dbNode) Addrs() []net.Addr {
return d.node.Addresses
}
// ForEachChannel is a higher-order function that will be used to iterate
// through all edges emanating from/to the target node. For each active
// channel, this function should be called with the populated ChannelEdge that
// describes the active channel.
//
// NOTE: Part of the autopilot.Node interface.
func (d dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
return d.node.ForEachChannel(d.tx, func(tx *bolt.Tx,
ei *channeldb.ChannelEdgeInfo, ep *channeldb.ChannelEdgePolicy) error {
edge := ChannelEdge{
Channel: Channel{
ChanID: lnwire.NewShortChanIDFromInt(ep.ChannelID),
Capacity: ei.Capacity,
FundedAmt: ei.Capacity,
Node: NewNodeID(ep.Node.PubKey),
},
Peer: dbNode{
tx: tx,
node: ep.Node,
},
}
return cb(edge)
})
}
// ForEachNode is a higher-order function that should be called once for each
// connected node within the channel graph. If the passed callback returns an
// error, then execution should be terminated.
//
// NOTE: Part of the autopilot.ChannelGraph interface.
func (d *databaseChannelGraph) ForEachNode(cb func(Node) error) error {
return d.db.ForEachNode(nil, func(tx *bolt.Tx, n *channeldb.LightningNode) error {
// We'll skip over any node that doesn't have any advertised
// addresses. As we won't be able to reach them to actually
// open any channels.
if len(n.Addresses) == 0 {
log.Tracef("Skipping unreachable node %x",
n.PubKey.SerializeCompressed())
return nil
}
node := dbNode{
tx: tx,
node: n,
}
return cb(node)
})
}
// addRandChannel creates a new channel two target nodes. This function is
// meant to aide in the generation of random graphs for use within test cases
// the exercise the autopilot package.
func (d *databaseChannelGraph) addRandChannel(node1, node2 *btcec.PublicKey,
capacity btcutil.Amount) (*ChannelEdge, *ChannelEdge, error) {
fetchNode := func(pub *btcec.PublicKey) (*channeldb.LightningNode, error) {
if pub != nil {
dbNode, err := d.db.FetchLightningNode(pub)
switch {
case err == channeldb.ErrGraphNodeNotFound:
fallthrough
case err == channeldb.ErrGraphNotFound:
graphNode := &channeldb.LightningNode{
PubKey: pub,
HaveNodeAnnouncement: true,
Addresses: []net.Addr{
&net.TCPAddr{
IP: bytes.Repeat([]byte("a"), 16),
},
},
Features: lnwire.NewFeatureVector(nil),
AuthSig: testSig,
}
if err := d.db.AddLightningNode(graphNode); err != nil {
return nil, err
}
case err != nil:
return nil, err
}
return dbNode, nil
}
nodeKey, err := randKey()
if err != nil {
return nil, err
}
dbNode := &channeldb.LightningNode{
PubKey: nodeKey,
HaveNodeAnnouncement: true,
Addresses: []net.Addr{
&net.TCPAddr{
IP: bytes.Repeat([]byte("a"), 16),
},
},
Features: lnwire.NewFeatureVector(nil),
AuthSig: testSig,
}
if err := d.db.AddLightningNode(dbNode); err != nil {
return nil, err
}
return dbNode, nil
}
vertex1, err := fetchNode(node1)
if err != nil {
return nil, nil, err
}
vertex2, err := fetchNode(node2)
if err != nil {
return nil, nil, err
}
var lnNode1, lnNode2 *btcec.PublicKey
node1Bytes := vertex1.PubKey.SerializeCompressed()
node2Bytes := vertex2.PubKey.SerializeCompressed()
if bytes.Compare(node1Bytes, node2Bytes) == -1 {
lnNode1 = vertex1.PubKey
lnNode2 = vertex2.PubKey
} else {
lnNode1 = vertex2.PubKey
lnNode2 = vertex1.PubKey
}
chanID := randChanID()
edge := &channeldb.ChannelEdgeInfo{
ChannelID: chanID.ToUint64(),
NodeKey1: lnNode1,
NodeKey2: lnNode2,
BitcoinKey1: vertex1.PubKey,
BitcoinKey2: vertex2.PubKey,
Capacity: capacity,
}
if err := d.db.AddChannelEdge(edge); err != nil {
return nil, nil, err
}
edgePolicy := &channeldb.ChannelEdgePolicy{
Signature: testSig,
ChannelID: chanID.ToUint64(),
LastUpdate: time.Now(),
TimeLockDelta: 10,
MinHTLC: btcutil.Amount(1),
FeeBaseMSat: btcutil.Amount(10),
FeeProportionalMillionths: btcutil.Amount(10000),
Flags: 0,
}
if err := d.db.UpdateEdgePolicy(edgePolicy); err != nil {
return nil, nil, err
}
edgePolicy = &channeldb.ChannelEdgePolicy{
Signature: testSig,
ChannelID: chanID.ToUint64(),
LastUpdate: time.Now(),
TimeLockDelta: 10,
MinHTLC: btcutil.Amount(1),
FeeBaseMSat: btcutil.Amount(10),
FeeProportionalMillionths: btcutil.Amount(10000),
Flags: 1,
}
if err := d.db.UpdateEdgePolicy(edgePolicy); err != nil {
return nil, nil, err
}
return &ChannelEdge{
Channel: Channel{
ChanID: chanID,
Capacity: capacity,
},
Peer: dbNode{
node: vertex1,
},
},
&ChannelEdge{
Channel: Channel{
ChanID: chanID,
Capacity: capacity,
},
Peer: dbNode{
node: vertex2,
},
},
nil
}
// memChannelGraph is a implementation of the autopilot.ChannelGraph backed by
// an in-memory graph.
type memChannelGraph struct {
graph map[NodeID]memNode
}
// A compile time assertion to ensure memChannelGraph meets the
// autopilot.ChannelGraph interface.
var _ ChannelGraph = (*memChannelGraph)(nil)
// newMemChannelGraph creates a new blank in-memory channel graph
// implementation.
func newMemChannelGraph() *memChannelGraph {
return &memChannelGraph{
graph: make(map[NodeID]memNode),
}
}
// ForEachNode is a higher-order function that should be called once for each
// connected node within the channel graph. If the passed callback returns an
// error, then execution should be terminated.
//
// NOTE: Part of the autopilot.ChannelGraph interface.
func (m memChannelGraph) ForEachNode(cb func(Node) error) error {
for _, node := range m.graph {
if err := cb(node); err != nil {
return err
}
}
return nil
}
// randChanID generates a new random channel ID.
func randChanID() lnwire.ShortChannelID {
return lnwire.NewShortChanIDFromInt(uint64(prand.Int63()))
}
// randKey returns a random public key.
func randKey() (*btcec.PublicKey, error) {
priv, err := btcec.NewPrivateKey(btcec.S256())
if err != nil {
return nil, err
}
return priv.PubKey(), nil
}
// addRandChannel creates a new channel two target nodes. This function is
// meant to aide in the generation of random graphs for use within test cases
// the exercise the autopilot package.
func (m *memChannelGraph) addRandChannel(node1, node2 *btcec.PublicKey,
capacity btcutil.Amount) (*ChannelEdge, *ChannelEdge, error) {
var (
vertex1, vertex2 memNode
ok bool
)
if node1 != nil {
vertex1, ok = m.graph[NewNodeID(node1)]
if !ok {
vertex1 = memNode{
pub: node1,
}
}
} else {
newPub, err := randKey()
if err != nil {
return nil, nil, err
}
vertex1 = memNode{
pub: newPub,
}
}
if node2 != nil {
vertex2, ok = m.graph[NewNodeID(node2)]
if !ok {
vertex2 = memNode{
pub: node2,
}
}
} else {
newPub, err := randKey()
if err != nil {
return nil, nil, err
}
vertex2 = memNode{
pub: newPub,
}
}
channel := Channel{
ChanID: randChanID(),
Capacity: capacity,
}
edge1 := ChannelEdge{
Channel: channel,
Peer: vertex2,
}
vertex1.chans = append(vertex1.chans, edge1)
edge2 := ChannelEdge{
Channel: channel,
Peer: vertex1,
}
vertex2.chans = append(vertex2.chans, edge2)
m.graph[NewNodeID(vertex1.pub)] = vertex1
m.graph[NewNodeID(vertex2.pub)] = vertex2
return &edge1, &edge2, nil
}
// memNode is a purely in-memory implementation of the autopilot.Node
// interface.
type memNode struct {
pub *btcec.PublicKey
chans []ChannelEdge
addrs []net.Addr
}
// A compile time assertion to ensure memNode meets the autopilot.Node
// interface.
var _ Node = (*memNode)(nil)
// PubKey is the identity public key of the node. This will be used to attempt
// to target a node for channel opening by the main autopilot agent.
//
// NOTE: Part of the autopilot.Node interface.
func (m memNode) PubKey() *btcec.PublicKey {
return m.pub
}
// Addrs returns a slice of publicly reachable public TCP addresses that the
// peer is known to be listening on.
//
// NOTE: Part of the autopilot.Node interface.
func (m memNode) Addrs() []net.Addr {
return m.addrs
}
// ForEachChannel is a higher-order function that will be used to iterate
// through all edges emanating from/to the target node. For each active
// channel, this function should be called with the populated ChannelEdge that
// describes the active channel.
//
// NOTE: Part of the autopilot.Node interface.
func (m memNode) ForEachChannel(cb func(ChannelEdge) error) error {
for _, channel := range m.chans {
if err := cb(channel); err != nil {
return err
}
}
return nil
}

151
autopilot/interface.go Normal file
View File

@ -0,0 +1,151 @@
package autopilot
import (
"net"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
)
// Node node is an interface which represents n abstract vertex within the
// channel graph. All nodes should have at least a single edge to/from them
// within the graph.
//
// TODO(roasbeef): combine with routing.ChannelGraphSource
type Node interface {
// PubKey is the identity public key of the node. This will be used to
// attempt to target a node for channel opening by the main autopilot
// agent.
PubKey() *btcec.PublicKey
// Addrs returns a slice of publicly reachable public TCP addresses
// that the peer is known to be listening on.
Addrs() []net.Addr
// ForEachChannel is a higher-order function that will be used to
// iterate through all edges emanating from/to the target node. For
// each active channel, this function should be called with the
// populated ChannelEdge that describes the active channel.
ForEachChannel(func(ChannelEdge) error) error
}
// Channel is a simple struct which contains relevant details of a particular
// channel within the channel graph. The fields in this struct may be used a
// signals for various AttachmentHeuristic implementations.
type Channel struct {
// ChanID is the short channel ID for this channel as defined within
// BOLT-0007.
ChanID lnwire.ShortChannelID
// Capacity is the capacity of the channel expressed in satoshis.
Capacity btcutil.Amount
// FundedAmt is the amount the local node funded into the target
// channel.
//
// TODO(roasbeef): need this?
FundedAmt btcutil.Amount
// Node is the peer that this channel has been established with.
Node NodeID
// TODO(roasbeef): also add other traits?
// * fee, timelock, etc
}
// ChannelEdge is a struct that holds details concerning a channel, but also
// contains a reference to the Node that this channel connects to as a directed
// edge witihn the graph. The existence of this reference to the connected node
// will allow callers to traverse the graph in an object-oriented manner.
type ChannelEdge struct {
// Channel contains the attributes of this channel.
Channel
// Peer is the peer that this channel creates an edge to in the channel
// graph.
Peer Node
}
// ChannelGraph in an interface that represents a traversable channel graph.
// The autopilot agent will use this interface as its source of graph traits in
// order to make decisions concerning which channels should be opened, and to
// whom.
//
// TODO(roasbeef): abstract??
type ChannelGraph interface {
// ForEachNode is a higher-order function that should be called once
// for each connected node within the channel graph. If the passed
// callback returns an error, then execution should be terminated.
ForEachNode(func(Node) error) error
}
// AttachmentDirective describes a channel attachment proscribed by an
// AttachmentHeuristic. It details to which node a channel should be created
// to, and also the parameters which should be used in the channel creation.
type AttachmentDirective struct {
// PeerKey is the target node for this attachment directive. It can be
// identified by it's public key, and therefore can be used along with
// a ChannelOpener implementation to execute the directive.
PeerKey *btcec.PublicKey
// ChanAmt is the size of the channel that should be opened, expressed
// in satoshis.
ChanAmt btcutil.Amount
// Addrs is a list of addresses that the target peer may be reachable
// at.
Addrs []net.Addr
}
// AttachmentHeuristic is one of the primary interfaces within this package.
// Implementations of this interface will be used to implement a control system
// which automatically regulates channels of a particular agent, attempting to
// optimize channels opened/closed based on various heuristics. The purpose of
// the interface is to allow an auto-pilot agent to decide if it needs more
// channels, and if so, which exact channels should be opened.
type AttachmentHeuristic interface {
// NeedMoreChans is a predicate that should return true if, given the
// passed parameters, and its internal state, more channels should be
// opened within the channel graph. If the heuristic decides that we do
// indeed need more channels, then the second argument returned will
// represent the amount of additional funds to be used towards creating
// channels.
NeedMoreChans(chans []Channel, balance btcutil.Amount) (btcutil.Amount, bool)
// Select is a method that given the current state of the channel
// graph, a set of nodes to ignore, and an amount of available funds,
// should return a set of attachment directives which describe which
// additional channels should be opened within the graph to push the
// heuristic back towards its equilibrium state.
Select(self *btcec.PublicKey, graph ChannelGraph, amtToUse btcutil.Amount,
skipNodes map[NodeID]struct{}) ([]AttachmentDirective, error)
}
// ChannelController is a simple interface that allows an auto-pilot agent to
// open a channel within the graph to a target peer, close targeted channels,
// or add/remove funds from existing channels via a splice in/out mechanisms.
type ChannelController interface {
// OpenChannel opens a channel to a target peer, with a capacity of the
// specified amount. This function should un-block immediately after
// the funding transaction that marks the channel open has been
// broadcast.
OpenChannel(target *btcec.PublicKey, amt btcutil.Amount,
addrs []net.Addr) error
// CloseChannel attempts to close out the target channel.
//
// TODO(roasbeef): add force option?
CloseChannel(chanPoint *wire.OutPoint) error
// SpliceIn attempts to add additional funds to the target channel via
// a splice in mechanism. The new channel with an updated capacity
// should be returned.
SpliceIn(chanPoint *wire.OutPoint, amt btcutil.Amount) (*Channel, error)
// SpliceOut attempts to remove funds from an existing channels using a
// splice out mechanism. The removed funds from the channel should be
// returned to an output under the control of the backing wallet.
SpliceOut(chanPoint *wire.OutPoint, amt btcutil.Amount) (*Channel, error)
}

26
autopilot/log.go Normal file
View File

@ -0,0 +1,26 @@
package autopilot
import "github.com/btcsuite/btclog"
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger
// The default amount of logging is none.
func init() {
DisableLog()
}
// DisableLog disables all library log output. Logging output is disabled
// by default until UseLogger is called.
func DisableLog() {
log = btclog.Disabled
}
// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}

271
autopilot/prefattach.go Normal file
View File

@ -0,0 +1,271 @@
package autopilot
import (
"fmt"
prand "math/rand"
"time"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcutil"
)
// ConstrainedPrefAttachment is an implementation of the AttachmentHeuristic
// interface that implement a constrained non-linear preferential attachment
// heuristic. This means that given a threshold to allocate to automatic
// channel establishment, the heuristic will attempt to favor connecting to
// nodes which already have a set amount of links, selected by sampling from a
// power law distribution. The attachment ins non-linear in that it favors
// nodes with a higher in-degree but less so that regular linear preferential
// attachment. As a result, this creates smaller and less clusters than regular
// linear preferential attachment.
//
// TODO(roasbeef): BA, with k=-3
type ConstrainedPrefAttachment struct {
minChanSize btcutil.Amount
maxChanSize btcutil.Amount
chanLimit uint16
threshold float64
}
// NewPrefAttchment creates a new instance of a ConstrainedPrefAttachment
// heuristics given bounds on allowed channel sizes, and an allocation amount
// which is interpreted as a percentage of funds that is to be committed to
// channels at all times.
func NewConstrainedPrefAttachment(minChanSize, maxChanSize btcutil.Amount,
chanLimit uint16, allocation float64) *ConstrainedPrefAttachment {
prand.Seed(time.Now().Unix())
return &ConstrainedPrefAttachment{
minChanSize: minChanSize,
chanLimit: chanLimit,
maxChanSize: maxChanSize,
threshold: allocation,
}
}
// A compile time assertion to ensure ConstrainedPrefAttachment meets the
// AttachmentHeuristic interface.
var _ AttachmentHeuristic = (*ConstrainedPrefAttachment)(nil)
// NeedMoreChans is a predicate that should return true if, given the passed
// parameters, and its internal state, more channels should be opened within
// the channel graph. If the heuristic decides that we do indeed need more
// channels, then the second argument returned will represent the amount of
// additional funds to be used towards creating channels.
//
// NOTE: This is a part of the AttachmentHeuristic interface.
func (p *ConstrainedPrefAttachment) NeedMoreChans(channels []Channel,
funds btcutil.Amount) (btcutil.Amount, bool) {
// If we're already over our maximum allowed number of channels, then
// we'll instruct the controller not to create any more channels.
if len(channels) >= int(p.chanLimit) {
return 0, false
}
// First, we'll tally up the total amount of funds that are currently
// present within the set of active channels.
var totalChanAllocation btcutil.Amount
for _, channel := range channels {
totalChanAllocation += channel.Capacity
}
// With this value known, we'll now compute the total amount of fund
// allocated across regular utxo's and channel utxo's.
totalFunds := funds + totalChanAllocation
// Once the total amount has been computed, we then calculate the
// fraction of funds currently allocated to channels.
fundsFraction := float64(totalChanAllocation) / float64(totalFunds)
// If this fraction is below our threshold, then we'll return true, to
// indicate the controller should call Select to obtain a candidate set
// of channels to attempt to open.
needMore := fundsFraction < p.threshold
if !needMore {
return 0, false
}
// Now that we know we need more funds, we'll compute the amount of
// additional funds we should allocate towards channels.
targetAllocation := btcutil.Amount(float64(totalFunds) * p.threshold)
fundsAvailable := targetAllocation - totalChanAllocation
return fundsAvailable, true
}
// nodeID is a simple type that holds a EC public key serialized in compressed
// format.
type NodeID [33]byte
// NewNodeID creates a new nodeID from a passed public key.
func NewNodeID(pub *btcec.PublicKey) NodeID {
var n NodeID
copy(n[:], pub.SerializeCompressed())
return n
}
// Select returns a candidate set of attachment directives that should be
// executed based on the current internal state, the state of the channel
// graph, the set of nodes we should exclude, and the amount of funds
// available. The heuristic employed by this method is one that attempts to
// promote a scale-free network globally, via local attachment preferences for
// new nodes joining the network with an amount of available funds to be
// allocated to channels. Specifically, we consider the degree of each node
// (and the flow in/out of the node available via its open channels) and
// utilize the BarabásiAlbert model to drive our recommended attachment
// heuristics. If implemented globally for each new participant, this results
// in a channel graph that is scale-free and follows a power law distribution
// with k=-3.
//
// NOTE: This is a part of the AttachmentHeuristic interface.
func (p *ConstrainedPrefAttachment) Select(self *btcec.PublicKey, g ChannelGraph,
fundsAvailable btcutil.Amount, skipNodes map[NodeID]struct{}) ([]AttachmentDirective, error) {
// TODO(roasbeef): rename?
var directives []AttachmentDirective
if fundsAvailable < p.minChanSize {
return directives, nil
}
// We'll continue our attachment loop until we've exhausted the current
// amount of available funds.
visited := make(map[NodeID]struct{})
for i := uint16(0); i < p.chanLimit; i++ {
// selectionSlice will be used to randomly select a node
// according to a power law distribution. For each connected
// edge, we'll add an instance of the node to this slice. Thus,
// for a given node, the probability that we'll attach to it
// is: k_i / sum(k_j), where k_i is the degree of the target
// node, and k_j is the degree of all other nodes i != j. This
// implements the classic BarabásiAlbert model for
// preferential attachment.
var selectionSlice []Node
// For each node, and each channel that the node has, we'll add
// an instance of that node to the selection slice above.
// This'll slice where the frequency of each node is equivalent
// to the number of channels that connect to it.
//
// TODO(roasbeef): add noise to make adversarially resistant?
if err := g.ForEachNode(func(node Node) error {
nID := NewNodeID(node.PubKey())
// Once a node has already been attached to, we'll
// ensure that it isn't factored into any further
// decisions within this round.
if _, ok := visited[nID]; ok {
return nil
}
// If we come across ourselves, them we'll continue in
// order to avoid attempting to make a channel with
// ourselves.
if node.PubKey().IsEqual(self) {
return nil
}
// Additionally, if this node is in the backlist, then
// we'll skip it.
if _, ok := skipNodes[nID]; ok {
return nil
}
// For initial bootstrap purposes, if a node doesn't
// have any channels, then we'll ensure that it has at
// least one item in the selection slice.
//
// TODO(roasbeef): make conditional?
selectionSlice = append(selectionSlice, node)
// For each active channel the node has, we'll add an
// additional channel to the selection slice to
// increase their weight.
if err := node.ForEachChannel(func(channel ChannelEdge) error {
selectionSlice = append(selectionSlice, node)
return nil
}); err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
// If no nodes at all were accumulated, then we'll exit early
// as there are no eligible candidates.
if len(selectionSlice) == 0 {
break
}
// Given our selection slice, we'll now generate a random index
// into this slice. The node we select will be recommended by
// us to create a channel to.
selectedIndex := prand.Int31n(int32(len(selectionSlice)))
selectedNode := selectionSlice[selectedIndex]
// TODO(roasbeef): cap on num channels to same participant?
// With the node selected, we'll add this (node, amount) tuple
// to out set of recommended directives.
pub := selectedNode.PubKey()
directives = append(directives, AttachmentDirective{
// TODO(roasbeef): need curve?
PeerKey: &btcec.PublicKey{
X: pub.X,
Y: pub.Y,
},
Addrs: selectedNode.Addrs(),
})
// With the node selected, we'll add it to the set of visited
// nodes to avoid attaching to it again.
visited[NewNodeID(selectedNode.PubKey())] = struct{}{}
}
numSelectedNodes := int64(len(directives))
switch {
// If we have enough available funds to distribute the maximum channel
// size for each of the selected peers to attach to, then we'll
// allocate the maximum amount to each peer.
case int64(fundsAvailable) >= numSelectedNodes*int64(p.maxChanSize):
for i := 0; i < int(numSelectedNodes); i++ {
directives[i].ChanAmt = p.maxChanSize
}
return directives, nil
// Otherwise, we'll greedily allocate our funds to the channels
// successively until we run out of available funds, or can't create a
// channel above the min channel size.
case int64(fundsAvailable) < numSelectedNodes*int64(p.maxChanSize):
i := 0
for fundsAvailable > p.minChanSize {
// We'll attempt to allocate the max channel size
// initially. If we don't have enough funds to do this,
// then we'll allocate the remainder of the funds
// available to the channel.
delta := p.maxChanSize
if fundsAvailable-delta < 0 {
delta = fundsAvailable
}
directives[i].ChanAmt = delta
fundsAvailable -= delta
i++
}
// We'll slice the initial set of directives to properly
// reflect the amount of funds we were able to allocate.
return directives[:i:i], nil
default:
return nil, fmt.Errorf("err")
}
}

View File

@ -0,0 +1,635 @@
package autopilot
import (
"io/ioutil"
"os"
"testing"
"time"
prand "math/rand"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcutil"
)
func TestConstrainedPrefAttachmentNeedMoreChan(t *testing.T) {
t.Parallel()
prand.Seed(time.Now().Unix())
const (
minChanSize = 0
maxChanSize = btcutil.Amount(btcutil.SatoshiPerBitcoin)
chanLimit = 3
threshold = 0.5
)
randChanID := func() lnwire.ShortChannelID {
return lnwire.NewShortChanIDFromInt(uint64(prand.Int63()))
}
testCases := []struct {
channels []Channel
walletAmt btcutil.Amount
needMore bool
amtAvailable btcutil.Amount
}{
// Many available funds, but already have too many active open
// channels.
{
[]Channel{
{
ChanID: randChanID(),
Capacity: btcutil.Amount(prand.Int31()),
},
{
ChanID: randChanID(),
Capacity: btcutil.Amount(prand.Int31()),
},
{
ChanID: randChanID(),
Capacity: btcutil.Amount(prand.Int31()),
},
},
btcutil.Amount(btcutil.SatoshiPerBitcoin * 10),
false,
0,
},
// Ratio of funds in channels and total funds meets the
// threshold.
{
[]Channel{
{
ChanID: randChanID(),
Capacity: btcutil.Amount(btcutil.SatoshiPerBitcoin),
},
{
ChanID: randChanID(),
Capacity: btcutil.Amount(btcutil.SatoshiPerBitcoin),
},
},
btcutil.Amount(btcutil.SatoshiPerBitcoin * 2),
false,
0,
},
// Ratio of funds in channels and total funds is below the
// threshold. We have 10 BTC allocated amongst channels and
// funds, atm. We're targeting 50%, so 5 BTC should be
// allocated. Only 1 BTC is atm, so 4 BTC should be
// recommended.
{
[]Channel{
{
ChanID: randChanID(),
Capacity: btcutil.Amount(btcutil.SatoshiPerBitcoin),
},
},
btcutil.Amount(btcutil.SatoshiPerBitcoin * 9),
true,
btcutil.Amount(btcutil.SatoshiPerBitcoin * 4),
},
// Ratio of funds in channels and total funds is below the
// threshold. We have 14 BTC total amongst the wallet's
// balance, and our currently opened channels. Since we're
// targeting a 50% allocation, we should commit 7 BTC. The
// current channels commit 4 BTC, so we should expected 3 bTC
// to be committed.
{
[]Channel{
{
ChanID: randChanID(),
Capacity: btcutil.Amount(btcutil.SatoshiPerBitcoin),
},
{
ChanID: randChanID(),
Capacity: btcutil.Amount(btcutil.SatoshiPerBitcoin * 3),
},
},
btcutil.Amount(btcutil.SatoshiPerBitcoin * 10),
true,
btcutil.Amount(btcutil.SatoshiPerBitcoin * 3),
},
// Ratio of funds in channels and total funds is above the
// threshold.
{
[]Channel{
{
ChanID: randChanID(),
Capacity: btcutil.Amount(btcutil.SatoshiPerBitcoin),
},
{
ChanID: randChanID(),
Capacity: btcutil.Amount(btcutil.SatoshiPerBitcoin),
},
},
btcutil.Amount(btcutil.SatoshiPerBitcoin),
false,
0,
},
}
prefAttatch := NewConstrainedPrefAttachment(minChanSize, maxChanSize,
chanLimit, threshold)
for i, testCase := range testCases {
amtToAllocate, needMore := prefAttatch.NeedMoreChans(testCase.channels,
testCase.walletAmt)
if amtToAllocate != testCase.amtAvailable {
t.Fatalf("test #%v: expected %v, got %v",
i, testCase.amtAvailable, amtToAllocate)
}
if needMore != testCase.needMore {
t.Fatalf("test #%v: expected %v, got %v",
i, testCase.needMore, needMore)
}
}
}
type genGraphFunc func() (testGraph, func(), error)
type testGraph interface {
ChannelGraph
addRandChannel(*btcec.PublicKey, *btcec.PublicKey,
btcutil.Amount) (*ChannelEdge, *ChannelEdge, error)
}
func newDiskChanGraph() (testGraph, func(), error) {
// First, create a temporary directory to be used for the duration of
// this test.
tempDirName, err := ioutil.TempDir("", "channeldb")
if err != nil {
return nil, nil, err
}
// Next, create channeldb for the first time.
cdb, err := channeldb.Open(tempDirName)
if err != nil {
return nil, nil, err
}
cleanUp := func() {
cdb.Close()
os.RemoveAll(tempDirName)
}
return &databaseChannelGraph{
db: cdb.ChannelGraph(),
}, cleanUp, nil
}
var _ testGraph = (*databaseChannelGraph)(nil)
func newMemChanGraph() (testGraph, func(), error) {
return newMemChannelGraph(), nil, nil
}
var _ testGraph = (*memChannelGraph)(nil)
var chanGraphs = []struct {
name string
genFunc genGraphFunc
}{
{
name: "disk_graph",
genFunc: newDiskChanGraph,
},
{
name: "mem_graph",
genFunc: newMemChanGraph,
},
}
// TestConstrainedPrefAttachmentSelectEmptyGraph ensures that when passed en
// empty graph, the Select function always detects the state, and returns nil.
// Otherwise, it would be possible for the main Select loop to entire an
// infinite loop.
func TestConstrainedPrefAttachmentSelectEmptyGraph(t *testing.T) {
const (
minChanSize = 0
maxChanSize = btcutil.Amount(btcutil.SatoshiPerBitcoin)
chanLimit = 3
threshold = 0.5
)
// First, we'll generate a random key that represents "us", and create
// a new instance of the heuristic with our set parameters.
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate self key: %v", err)
}
prefAttatch := NewConstrainedPrefAttachment(minChanSize, maxChanSize,
chanLimit, threshold)
skipNodes := make(map[NodeID]struct{})
for _, graph := range chanGraphs {
success := t.Run(graph.name, func(t1 *testing.T) {
graph, cleanup, err := graph.genFunc()
if err != nil {
t1.Fatalf("unable to create graph: %v", err)
}
if cleanup != nil {
defer cleanup()
}
// With the necessary state initialized, we'll not
// attempt to select a set of candidates channel for
// creation given the current state of the graph.
const walletFunds = btcutil.SatoshiPerBitcoin
directives, err := prefAttatch.Select(self, graph,
walletFunds, skipNodes)
if err != nil {
t1.Fatalf("unable to select attachment "+
"directives: %v", err)
}
// We shouldn't have selected any new directives as we
// started with an empty graph.
if len(directives) != 0 {
t1.Fatalf("zero attachment directives "+
"should've been returned instead %v were",
len(directives))
}
})
if !success {
break
}
}
}
// TestConstrainedPrefAttachmentSelectTwoVertexes ensures that when passed a
// graph with only two eligible vertexes, then both are selected (without any
// repeats), and the funds are appropriately allocated across each peer.
func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) {
t.Parallel()
prand.Seed(time.Now().Unix())
const (
minChanSize = 0
maxChanSize = btcutil.Amount(btcutil.SatoshiPerBitcoin)
chanLimit = 3
threshold = 0.5
)
skipNodes := make(map[NodeID]struct{})
for _, graph := range chanGraphs {
success := t.Run(graph.name, func(t1 *testing.T) {
graph, cleanup, err := graph.genFunc()
if err != nil {
t1.Fatalf("unable to create graph: %v", err)
}
if cleanup != nil {
defer cleanup()
}
// First, we'll generate a random key that represents
// "us", and create a new instance of the heuristic
// with our set parameters.
self, err := randKey()
if err != nil {
t1.Fatalf("unable to generate self key: %v", err)
}
prefAttatch := NewConstrainedPrefAttachment(minChanSize, maxChanSize,
chanLimit, threshold)
// For this set, we'll load the memory graph with two
// nodes, and a random channel connecting them.
const chanCapacity = btcutil.SatoshiPerBitcoin
edge1, edge2, err := graph.addRandChannel(nil, nil, chanCapacity)
if err != nil {
t1.Fatalf("unable to generate channel: %v", err)
}
// With the necessary state initialized, we'll not
// attempt to select a set of candidates channel for
// creation given the current state of the graph.
const walletFunds = btcutil.SatoshiPerBitcoin * 10
directives, err := prefAttatch.Select(self, graph,
walletFunds, skipNodes)
if err != nil {
t1.Fatalf("unable to select attachment directives: %v", err)
}
// Two new directives should have been selected, one
// for each node already present within the graph.
if len(directives) != 2 {
t1.Fatalf("two attachment directives should've been "+
"returned instead %v were", len(directives))
}
// The node attached to should be amongst the two edges
// created above.
for _, directive := range directives {
switch {
case directive.PeerKey.IsEqual(edge1.Peer.PubKey()):
case directive.PeerKey.IsEqual(edge2.Peer.PubKey()):
default:
t1.Fatalf("attache to unknown node: %x",
directive.PeerKey.SerializeCompressed())
}
// As the number of funds available exceed the
// max channel size, both edges should consume
// the maximum channel size.
if directive.ChanAmt != maxChanSize {
t1.Fatalf("max channel size should be allocated, "+
"instead %v was: ", maxChanSize)
}
}
})
if !success {
break
}
}
}
// TestConstrainedPrefAttachmentSelectInsufficientFunds ensures that if the
// balance of the backing wallet is below the set min channel size, then it
// never recommends candidates to attach to.
func TestConstrainedPrefAttachmentSelectInsufficientFunds(t *testing.T) {
t.Parallel()
prand.Seed(time.Now().Unix())
const (
minChanSize = 0
maxChanSize = btcutil.Amount(btcutil.SatoshiPerBitcoin)
chanLimit = 3
threshold = 0.5
)
skipNodes := make(map[NodeID]struct{})
for _, graph := range chanGraphs {
success := t.Run(graph.name, func(t1 *testing.T) {
graph, cleanup, err := graph.genFunc()
if err != nil {
t1.Fatalf("unable to create graph: %v", err)
}
if cleanup != nil {
defer cleanup()
}
// First, we'll generate a random key that represents
// "us", and create a new instance of the heuristic
// with our set parameters.
self, err := randKey()
if err != nil {
t1.Fatalf("unable to generate self key: %v", err)
}
prefAttatch := NewConstrainedPrefAttachment(
minChanSize, maxChanSize, chanLimit, threshold,
)
// Next, we'll attempt to select a set of candidates,
// passing zero for the amount of wallet funds. This
// should return an empty slice of directives.
directives, err := prefAttatch.Select(self, graph, 0,
skipNodes)
if err != nil {
t1.Fatalf("unable to select attachment "+
"directives: %v", err)
}
if len(directives) != 0 {
t1.Fatalf("zero attachment directives "+
"should've been returned instead %v were",
len(directives))
}
})
if !success {
break
}
}
}
// TestConstrainedPrefAttachmentSelectGreedyAllocation tests that if upon
// deciding a set of candidates, we're unable to evenly split our funds, then
// we attempt to greedily allocate all funds to each selected vertex (up to the
// max channel size).
func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) {
t.Parallel()
prand.Seed(time.Now().Unix())
const (
minChanSize = 0
maxChanSize = btcutil.Amount(btcutil.SatoshiPerBitcoin)
chanLimit = 3
threshold = 0.5
)
skipNodes := make(map[NodeID]struct{})
for _, graph := range chanGraphs {
success := t.Run(graph.name, func(t1 *testing.T) {
graph, cleanup, err := graph.genFunc()
if err != nil {
t1.Fatalf("unable to create graph: %v", err)
}
if cleanup != nil {
defer cleanup()
}
// First, we'll generate a random key that represents
// "us", and create a new instance of the heuristic
// with our set parameters.
self, err := randKey()
if err != nil {
t1.Fatalf("unable to generate self key: %v", err)
}
prefAttatch := NewConstrainedPrefAttachment(
minChanSize, maxChanSize, chanLimit, threshold,
)
const chanCapcity = btcutil.SatoshiPerBitcoin
// Next, we'll add 3 nodes to the graph, creating an
// "open triangle topology".
edge1, _, err := graph.addRandChannel(nil, nil,
chanCapcity)
if err != nil {
t1.Fatalf("unable to create channel: %v", err)
}
_, _, err = graph.addRandChannel(
edge1.Peer.PubKey(), nil, chanCapcity,
)
if err != nil {
t1.Fatalf("unable to create channel: %v", err)
}
// At this point, there should be three nodes in the
// graph, with node node having two edges.
numNodes := 0
twoChans := false
if err := graph.ForEachNode(func(n Node) error {
numNodes++
numChans := 0
err := n.ForEachChannel(func(c ChannelEdge) error {
numChans++
return nil
})
if err != nil {
return err
}
twoChans = twoChans || (numChans == 2)
return nil
}); err != nil {
t1.Fatalf("unable to traverse graph: %v", err)
}
if numNodes != 3 {
t1.Fatalf("expected 3 nodes, instead have: %v",
numNodes)
}
if !twoChans {
t1.Fatalf("expected node to have two channels")
}
// We'll now begin our test, modeling the available
// wallet balance to be 5.5 BTC. We're shooting for a
// 50/50 allocation, and have 3 BTC in channels. As a
// result, the heuristic should try to greedily
// allocate funds to channels.
const availableBalance = btcutil.SatoshiPerBitcoin * 2.5
directives, err := prefAttatch.Select(self, graph,
availableBalance, skipNodes)
if err != nil {
t1.Fatalf("unable to select attachment "+
"directives: %v", err)
}
// Three directives should have been returned.
if len(directives) != 3 {
t1.Fatalf("expected 3 directives, instead "+
"got: %v", len(directives))
}
// The two directive should have the max channel amount
// allocated.
if directives[0].ChanAmt != maxChanSize {
t1.Fatalf("expected recommendation of %v, "+
"instead got %v", maxChanSize,
directives[0].ChanAmt)
}
if directives[1].ChanAmt != maxChanSize {
t1.Fatalf("expected recommendation of %v, "+
"instead got %v", maxChanSize,
directives[1].ChanAmt)
}
// The third channel should have been allocated the
// remainder, or 0.5 BTC.
if directives[2].ChanAmt != (btcutil.SatoshiPerBitcoin * 0.5) {
t1.Fatalf("expected recommendation of %v, "+
"instead got %v", maxChanSize,
directives[2].ChanAmt)
}
})
if !success {
break
}
}
}
// TestConstrainedPrefAttachmentSelectSkipNodes ensures that if a node was
// already select for attachment, then that node is excluded from the set of
// candidate nodes.
func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) {
t.Parallel()
prand.Seed(time.Now().Unix())
const (
minChanSize = 0
maxChanSize = btcutil.Amount(btcutil.SatoshiPerBitcoin)
chanLimit = 3
threshold = 0.5
)
skipNodes := make(map[NodeID]struct{})
for _, graph := range chanGraphs {
success := t.Run(graph.name, func(t1 *testing.T) {
graph, cleanup, err := graph.genFunc()
if err != nil {
t1.Fatalf("unable to create graph: %v", err)
}
if cleanup != nil {
defer cleanup()
}
// First, we'll generate a random key that represents
// "us", and create a new instance of the heuristic
// with our set parameters.
self, err := randKey()
if err != nil {
t1.Fatalf("unable to generate self key: %v", err)
}
prefAttatch := NewConstrainedPrefAttachment(
minChanSize, maxChanSize, chanLimit, threshold,
)
// Next, we'll create a simple topology of two nodes,
// with a single channel connecting them.
const chanCapcity = btcutil.SatoshiPerBitcoin
_, _, err = graph.addRandChannel(nil, nil,
chanCapcity)
if err != nil {
t1.Fatalf("unable to create channel: %v", err)
}
// With our graph created, we'll now execute the Select
// function to recommend potential attachment
// candidates.
const availableBalance = btcutil.SatoshiPerBitcoin * 2.5
directives, err := prefAttatch.Select(self, graph,
availableBalance, skipNodes)
if err != nil {
t1.Fatalf("unable to select attachment "+
"directives: %v", err)
}
// As the channel limit is three, and two nodes are
// present in the graph, both should be selected.
if len(directives) != 2 {
t1.Fatalf("expected two directives, instead "+
"got %v", len(directives))
}
// We'll simulate a channel update by adding the nodes
// we just establish channel with the to set of nodes
// to be skipped.
skipNodes[NewNodeID(directives[0].PeerKey)] = struct{}{}
skipNodes[NewNodeID(directives[1].PeerKey)] = struct{}{}
// If we attempt to make a call to the Select function,
// without providing any new information, then we
// should get no new directives as both nodes has
// already been attached to.
directives, err = prefAttatch.Select(self, graph,
availableBalance, skipNodes)
if err != nil {
t1.Fatalf("unable to select attachment "+
"directives: %v", err)
}
if len(directives) != 0 {
t1.Fatalf("zero new directives should have been "+
"selected, but %v were", len(directives))
}
})
if !success {
break
}
}
}