package main import ( "fmt" "net" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/autopilot" "github.com/lightningnetwork/lnd/lnwire" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" ) // chanController is an implementation of the autopilot.ChannelController // interface that's backed by a running lnd instance. type chanController struct { server *server } // OpenChannel opens a channel to a target peer, with a capacity of the // specified amount. This function should un-block immediately after the // funding transaction that marks the channel open has been broadcast. func (c *chanController) OpenChannel(target *btcec.PublicKey, amt btcutil.Amount, addrs []net.Addr) error { // We can't establish a channel if no addresses were provided for the // peer. if len(addrs) == 0 { return fmt.Errorf("Unable to create channel w/o an active " + "address") } // First, we'll check if we're already connected to the target peer. If // not, then we'll need to establish a connection. if _, err := c.server.FindPeer(target); err != nil { // TODO(roasbeef): try teach addr atplLog.Tracef("Connecting to %x to auto-create channel: ", target.SerializeCompressed()) lnAddr := &lnwire.NetAddress{ IdentityKey: target, ChainNet: activeNetParams.Net, } // We'll attempt to successively connect to each of the // advertised IP addresses until we've either exhausted the // advertised IP addresses, or have made a connection. var connected bool for _, addr := range addrs { // If the address doesn't already have a port, then // we'll assume the current default port. tcpAddr, ok := addr.(*net.TCPAddr) if !ok { return fmt.Errorf("TCP address required instead "+ "have %T", addr) } if tcpAddr.Port == 0 { tcpAddr.Port = defaultPeerPort } lnAddr.Address = tcpAddr // TODO(roasbeef): make perm connection in server after // chan open? err := c.server.ConnectToPeer(lnAddr, false) if err != nil { // If we weren't able to connect to the peer, // then we'll move onto the next. continue } connected = true break } // If we weren't able to establish a connection at all, then // we'll error out. if !connected { return fmt.Errorf("Unable to connect to %x", target.SerializeCompressed()) } } // With the connection established, we'll now establish our connection // to the target peer, waiting for the first update before we exit. feePerVSize, err := c.server.cc.feeEstimator.EstimateFeePerVSize(3) if err != nil { return err } // TODO(halseth): make configurable? minHtlc := lnwire.NewMSatFromSatoshis(1) updateStream, errChan := c.server.OpenChannel(target, amt, 0, minHtlc, feePerVSize, false) select { case err := <-errChan: // If we were not able to actually open a channel to the peer // for whatever reason, then we'll disconnect from the peer to // ensure we don't accumulate a bunch of unnecessary // connections. if err != nil { dcErr := c.server.DisconnectPeer(target) if dcErr != nil { atplLog.Errorf("Unable to disconnect from peer %v", target.SerializeCompressed()) } } return err case <-updateStream: return nil case <-c.server.quit: return nil } } func (c *chanController) CloseChannel(chanPoint *wire.OutPoint) error { return nil } func (c *chanController) SpliceIn(chanPoint *wire.OutPoint, amt btcutil.Amount) (*autopilot.Channel, error) { return nil, nil } func (c *chanController) SpliceOut(chanPoint *wire.OutPoint, amt btcutil.Amount) (*autopilot.Channel, error) { return nil, nil } // A compile time assertion to ensure chanController meets the // autopilot.ChannelController interface. var _ autopilot.ChannelController = (*chanController)(nil) // initAutoPilot initializes a new autopilot.Agent instance based on the passed // configuration struct. All interfaces needed to drive the pilot will be // registered and launched. func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) { atplLog.Infof("Instantiating autopilot with cfg: %v", spew.Sdump(cfg)) // First, we'll create the preferential attachment heuristic, // initialized with the passed auto pilot configuration parameters. // // TODO(roasbeef): switch here to dispatch specified heuristic minChanSize := svr.cc.wallet.Cfg.DefaultConstraints.DustLimit * 5 prefAttachment := autopilot.NewConstrainedPrefAttachment( minChanSize, maxFundingAmount, uint16(cfg.MaxChannels), cfg.Allocation, ) // With the heuristic itself created, we can now populate the remainder // of the items that the autopilot agent needs to perform its duties. self := svr.identityPriv.PubKey() pilotCfg := autopilot.Config{ Self: self, Heuristic: prefAttachment, ChanController: &chanController{svr}, WalletBalance: func() (btcutil.Amount, error) { return svr.cc.wallet.ConfirmedBalance(1) }, Graph: autopilot.ChannelGraphFromDatabase(svr.chanDB.ChannelGraph()), MaxPendingOpens: 10, } // Next, we'll fetch the current state of open channels from the // database to use as initial state for the auto-pilot agent. activeChannels, err := svr.chanDB.FetchAllChannels() if err != nil { return nil, err } initialChanState := make([]autopilot.Channel, len(activeChannels)) for i, channel := range activeChannels { initialChanState[i] = autopilot.Channel{ ChanID: channel.ShortChanID, Capacity: channel.Capacity, Node: autopilot.NewNodeID(channel.IdentityPub), } } // Now that we have all the initial dependencies, we can create the // auto-pilot instance itself. pilot, err := autopilot.New(pilotCfg, initialChanState) if err != nil { return nil, err } // Finally, we'll need to subscribe to two things: incoming // transactions that modify the wallet's balance, and also any graph // topology updates. txnSubscription, err := svr.cc.wallet.SubscribeTransactions() if err != nil { return nil, err } graphSubscription, err := svr.chanRouter.SubscribeTopology() if err != nil { return nil, err } // We'll launch a goroutine to provide the agent with notifications // whenever the balance of the wallet changes. svr.wg.Add(2) go func() { defer txnSubscription.Cancel() defer svr.wg.Done() for { select { case txnUpdate := <-txnSubscription.ConfirmedTransactions(): pilot.OnBalanceChange(txnUpdate.Value) case <-svr.quit: return } } }() go func() { defer svr.wg.Done() for { select { // We won't act upon new unconfirmed transaction, as // we'll only use confirmed outputs when funding. // However, we will still drain this request in order // to avoid goroutine leaks, and ensure we promptly // read from the channel if available. case <-txnSubscription.UnconfirmedTransactions(): case <-svr.quit: return } } }() // We'll also launch a goroutine to provide the agent with // notifications for when the graph topology controlled by the node // changes. svr.wg.Add(1) go func() { defer graphSubscription.Cancel() defer svr.wg.Done() for { select { case topChange, ok := <-graphSubscription.TopologyChanges: // If the router is shutting down, then we will // as well. if !ok { return } for _, edgeUpdate := range topChange.ChannelEdgeUpdates { // If this isn't an advertisement by // the backing lnd node, then we'll // continue as we only want to add // channels that we've created // ourselves. if !edgeUpdate.AdvertisingNode.IsEqual(self) { continue } // If this is indeed a channel we // opened, then we'll convert it to the // autopilot.Channel format, and notify // the pilot of the new channel. chanNode := autopilot.NewNodeID( edgeUpdate.ConnectingNode, ) chanID := lnwire.NewShortChanIDFromInt( edgeUpdate.ChanID, ) edge := autopilot.Channel{ ChanID: chanID, Capacity: edgeUpdate.Capacity, Node: chanNode, } pilot.OnChannelOpen(edge) } // For each closed closed channel, we'll obtain // the chanID of the closed channel and send it // to the pilot. for _, chanClose := range topChange.ClosedChannels { chanID := lnwire.NewShortChanIDFromInt( chanClose.ChanID, ) pilot.OnChannelClose(chanID) } case <-svr.quit: return } } }() return pilot, nil }