package main import ( "sync" "sync/atomic" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/rt/graph" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" "google.golang.org/grpc" ) const ( // TODO(roasbeef): tune msgBufferSize = 50 ) // reservationWithCtx encapsulates a pending channel reservation. This wrapper // struct is used internally within the funding manager to track and progress // the funding workflow initiated by incoming/outgoing methods from the target // peer. Additionally, this struct houses a response and error channel which is // used to respond to the caller in the case a channel workflow is initiated // via a local signal such as RPC. // TODO(roasbeef): actually use the context package // * deadlines, etc. type reservationWithCtx struct { reservation *lnwallet.ChannelReservation peer *peer updates chan *lnrpc.OpenStatusUpdate err chan error } // initFundingMsg is sent by an outside sub-system to the funding manager in // order to kick-off a funding workflow with a specified target peer. The // original request which defines the parameters of the funding workflow are // embedded within this message giving the funding manager full context w.r.t // the workflow. type initFundingMsg struct { peer *peer *openChanReq } // fundingRequestMsg couples an lnwire.SingleFundingRequest message with the // peer who sent the message. This allows the funding manager to queue a // response directly to the peer, progressing the funding workflow. type fundingRequestMsg struct { msg *lnwire.SingleFundingRequest peer *peer } // fundingResponseMsg couples an lnwire.SingleFundingResponse message with the // peer who sent the message. This allows the funding manager to queue a // response directly to the peer, progressing the funding workflow. type fundingResponseMsg struct { msg *lnwire.SingleFundingResponse peer *peer } // fundingCompleteMsg couples an lnwire.SingleFundingComplete message with the // peer who sent the message. This allows the funding manager to queue a // response directly to the peer, progressing the funding workflow. type fundingCompleteMsg struct { msg *lnwire.SingleFundingComplete peer *peer } // fundingSignCompleteMsg couples an lnwire.SingleFundingSignComplete message // with the peer who sent the message. This allows the funding manager to // queue a response directly to the peer, progressing the funding workflow. type fundingSignCompleteMsg struct { msg *lnwire.SingleFundingSignComplete peer *peer } // fundingOpenMsg couples an lnwire.SingleFundingOpenProof message // with the peer who sent the message. This allows the funding manager to // queue a response directly to the peer, progressing the funding workflow. type fundingOpenMsg struct { msg *lnwire.SingleFundingOpenProof peer *peer } // fundingErrorMsg couples an lnwire.ErrorGeneric message // with the peer who sent the message. This allows the funding // manager properly process the error. type fundingErrorMsg struct { err *lnwire.ErrorGeneric peer *peer } // pendingChannels is a map instantiated per-peer which tracks all active // pending single funded channels indexed by their pending channel identifier. type pendingChannels map[uint64]*reservationWithCtx // fundingManager acts as an orchestrator/bridge between the wallet's // 'ChannelReservation' workflow, and the wire protocol's funding initiation // messages. Any requests to initiate the funding workflow for a channel, // either kicked-off locally, or remotely is handled by the funding manager. // Once a channel's funding workflow has been completed, any local callers, the // local peer, and possibly the remote peer are notified of the completion of // the channel workflow. Additionally, any temporary or permanent access // controls between the wallet and remote peers are enforced via the funding // manager. type fundingManager struct { // MUST be used atomically. started int32 stopped int32 // channelReservations is a map which houses the state of all pending // funding workflows. resMtx sync.RWMutex activeReservations map[int32]pendingChannels // wallet is the daemon's internal Lightning enabled wallet. wallet *lnwallet.LightningWallet breachAribter *breachArbiter // fundingMsgs is a channel which receives wrapped wire messages // related to funding workflow from outside peers. fundingMsgs chan interface{} // queries is a channel which receives requests to query the internal // state of the funding manager. queries chan interface{} // fundingRequests is a channel used to receive channel initiation // requests from a local sub-system within the daemon. fundingRequests chan *initFundingMsg quit chan struct{} wg sync.WaitGroup } // newFundingManager creates and initializes a new instance of the // fundingManager. func newFundingManager(w *lnwallet.LightningWallet, b *breachArbiter) *fundingManager { return &fundingManager{ wallet: w, breachAribter: b, activeReservations: make(map[int32]pendingChannels), fundingMsgs: make(chan interface{}, msgBufferSize), fundingRequests: make(chan *initFundingMsg, msgBufferSize), queries: make(chan interface{}, 1), quit: make(chan struct{}), } } // Start launches all helper goroutines required for handling requests sent // to the funding manager. func (f *fundingManager) Start() error { if atomic.AddInt32(&f.started, 1) != 1 { // TODO(roasbeef): CAS instead return nil } fndgLog.Tracef("Funding manager running") f.wg.Add(1) // TODO(roasbeef): tune go f.reservationCoordinator() return nil } // Start signals all helper goroutines to execute a graceful shutdown. This // method will block until all goroutines have exited. func (f *fundingManager) Stop() error { if atomic.AddInt32(&f.stopped, 1) != 1 { return nil } fndgLog.Infof("Funding manager shutting down") close(f.quit) f.wg.Wait() return nil } type numPendingReq struct { resp chan uint32 } // NumPendingChannels returns the number of pending channels currently // progressing through the reservation workflow. func (f *fundingManager) NumPendingChannels() uint32 { resp := make(chan uint32, 1) req := &numPendingReq{resp} f.queries <- req return <-resp } type pendingChannel struct { peerId int32 identityPub *btcec.PublicKey channelPoint *wire.OutPoint capacity btcutil.Amount localBalance btcutil.Amount remoteBalance btcutil.Amount } type pendingChansReq struct { resp chan []*pendingChannel } // PendingChannels returns a slice describing all the channels which are // currently pending at the last state of the funding workflow. func (f *fundingManager) PendingChannels() []*pendingChannel { resp := make(chan []*pendingChannel, 1) req := &pendingChansReq{resp} f.queries <- req return <-resp } // reservationCoordinator is the primary goroutine tasked with progressing the // funding workflow between the wallet, and any outside peers or local callers. // // NOTE: This MUST be run as a goroutine. func (f *fundingManager) reservationCoordinator() { out: for { select { case msg := <-f.fundingMsgs: switch fmsg := msg.(type) { case *fundingRequestMsg: f.handleFundingRequest(fmsg) case *fundingResponseMsg: f.handleFundingResponse(fmsg) case *fundingCompleteMsg: f.handleFundingComplete(fmsg) case *fundingSignCompleteMsg: f.handleFundingSignComplete(fmsg) case *fundingOpenMsg: f.handleFundingOpen(fmsg) case *fundingErrorMsg: f.handleErrorGenericMsg(fmsg) } case req := <-f.fundingRequests: f.handleInitFundingMsg(req) case req := <-f.queries: switch msg := req.(type) { case *numPendingReq: f.handleNumPending(msg) case *pendingChansReq: f.handlePendingChannels(msg) } case <-f.quit: break out } } f.wg.Done() } // handleNumPending handles a request for the total number of pending channels. func (f *fundingManager) handleNumPending(msg *numPendingReq) { var numPending uint32 for _, peerChannels := range f.activeReservations { numPending += uint32(len(peerChannels)) } msg.resp <- numPending } // handlePendingChannels responds to a request for details concerning all // currently pending channels waiting for the final phase of the funding // workflow (funding txn confirmation). func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) { var pendingChannels []*pendingChannel for peerID, peerChannels := range f.activeReservations { for _, pendingChan := range peerChannels { peer := pendingChan.peer res := pendingChan.reservation localFund := res.OurContribution().FundingAmount remoteFund := res.TheirContribution().FundingAmount pendingChan := &pendingChannel{ peerId: peerID, identityPub: peer.addr.IdentityKey, channelPoint: res.FundingOutpoint(), capacity: localFund + remoteFund, localBalance: localFund, remoteBalance: remoteFund, } pendingChannels = append(pendingChannels, pendingChan) } } msg.resp <- pendingChannels } // processFundingRequest sends a message to the fundingManager allowing it to // initiate the new funding workflow with the source peer. func (f *fundingManager) processFundingRequest(msg *lnwire.SingleFundingRequest, peer *peer) { f.fundingMsgs <- &fundingRequestMsg{msg, peer} } // handleFundingRequest creates an initial 'ChannelReservation' within // the wallet, then responds to the source peer with a single funder response // message progressing the funding workflow. // TODO(roasbeef): add error chan to all, let channelManager handle // error+propagate func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) { // Check number of pending channels to be smaller than maximum allowed // number and send ErrorGeneric to remote peer if condition is violated. if len(f.activeReservations[fmsg.peer.id]) >= cfg.MaxPendingChannels { errMsg := &lnwire.ErrorGeneric{ ChannelPoint: &wire.OutPoint{ Hash: wire.ShaHash{}, Index: 0, }, Problem: "Number of pending channels exceed maximum", Code: lnwire.ErrorMaxPendingChannels, PendingChannelID: fmsg.msg.ChannelID, } fmsg.peer.queueMsg(errMsg, nil) return } msg := fmsg.msg amt := msg.FundingAmount delay := msg.CsvDelay // TODO(roasbeef): error if funding flow already ongoing fndgLog.Infof("Recv'd fundingRequest(amt=%v, delay=%v, pendingId=%v) "+ "from peerID(%v)", amt, delay, msg.ChannelID, fmsg.peer.id) ourDustLimit := lnwallet.DefaultDustLimit() theirDustlimit := msg.DustLimit // Attempt to initialize a reservation within the wallet. If the wallet // has insufficient resources to create the channel, then the reservation // attempt may be rejected. Note that since we're on the responding // side of a single funder workflow, we don't commit any funds to the // channel ourselves. // TODO(roasbeef): passing num confs 1 is irrelevant here, make signed? // TODO(roasbeef): assuming this was an inbound connection, replace // port with default advertised port reservation, err := f.wallet.InitChannelReservation(amt, 0, fmsg.peer.addr.IdentityKey, fmsg.peer.addr.Address, 1, delay, ourDustLimit) if err != nil { // TODO(roasbeef): push ErrorGeneric message fndgLog.Errorf("Unable to initialize reservation: %v", err) fmsg.peer.Disconnect() return } reservation.SetTheirDustLimit(theirDustlimit) // Once the reservation has been created successfully, we add it to this // peers map of pending reservations to track this particular reservation // until either abort or completion. f.resMtx.Lock() if _, ok := f.activeReservations[fmsg.peer.id]; !ok { f.activeReservations[fmsg.peer.id] = make(pendingChannels) } f.activeReservations[fmsg.peer.id][msg.ChannelID] = &reservationWithCtx{ reservation: reservation, peer: fmsg.peer, } f.resMtx.Unlock() // With our portion of the reservation initialied, process the // initiators contribution to the channel. _, addrs, _, err := txscript.ExtractPkScriptAddrs(msg.DeliveryPkScript, activeNetParams.Params) if err != nil { fndgLog.Errorf("Unable to extract addresses from script: %v", err) return } contribution := &lnwallet.ChannelContribution{ FundingAmount: amt, MultiSigKey: msg.ChannelDerivationPoint, CommitKey: msg.CommitmentKey, DeliveryAddress: addrs[0], CsvDelay: delay, } if err := reservation.ProcessSingleContribution(contribution); err != nil { fndgLog.Errorf("unable to add contribution reservation: %v", err) fmsg.peer.Disconnect() return } fndgLog.Infof("Sending fundingResp for pendingID(%v)", msg.ChannelID) // With the initiator's contribution recorded, response with our // contribution in the next message of the workflow. ourContribution := reservation.OurContribution() deliveryScript, err := txscript.PayToAddrScript(ourContribution.DeliveryAddress) if err != nil { fndgLog.Errorf("unable to convert address to pkscript: %v", err) return } fundingResp := lnwire.NewSingleFundingResponse(msg.ChannelID, ourContribution.RevocationKey, ourContribution.CommitKey, ourContribution.MultiSigKey, ourContribution.CsvDelay, deliveryScript, ourDustLimit) fmsg.peer.queueMsg(fundingResp, nil) } // processFundingRequest sends a message to the fundingManager allowing it to // continue the second phase of a funding workflow with the target peer. func (f *fundingManager) processFundingResponse(msg *lnwire.SingleFundingResponse, peer *peer) { f.fundingMsgs <- &fundingResponseMsg{msg, peer} } // handleFundingResponse processes a response to the workflow initiation sent // by the remote peer. This message then queues a message with the funding // outpoint, and a commitment signature to the remote peer. func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) { msg := fmsg.msg peerID := fmsg.peer.id chanID := fmsg.msg.ChannelID sourcePeer := fmsg.peer resCtx, err := f.getReservationCtx(peerID, chanID) if err != nil { fndgLog.Warnf("can' find reservation (peerID:%v, chanID:%v)", peerID, chanID) return } fndgLog.Infof("Recv'd fundingResponse for pendingID(%v)", msg.ChannelID) resCtx.reservation.SetTheirDustLimit(msg.DustLimit) // The remote node has responded with their portion of the channel // contribution. At this point, we can process their contribution which // allows us to construct and sign both the commitment transaction, and // the funding transaction. _, addrs, _, err := txscript.ExtractPkScriptAddrs(msg.DeliveryPkScript, activeNetParams.Params) if err != nil { fndgLog.Errorf("Unable to extract addresses from script: %v", err) resCtx.err <- err return } contribution := &lnwallet.ChannelContribution{ FundingAmount: 0, MultiSigKey: msg.ChannelDerivationPoint, CommitKey: msg.CommitmentKey, DeliveryAddress: addrs[0], RevocationKey: msg.RevocationKey, CsvDelay: msg.CsvDelay, } if err := resCtx.reservation.ProcessContribution(contribution); err != nil { fndgLog.Errorf("Unable to process contribution from %v: %v", sourcePeer, err) fmsg.peer.Disconnect() resCtx.err <- err return } // Now that we have their contribution, we can extract, then send over // both the funding out point and our signature for their version of // the commitment transaction to the remote peer. outPoint := resCtx.reservation.FundingOutpoint() _, sig := resCtx.reservation.OurSignatures() commitSig, err := btcec.ParseSignature(sig, btcec.S256()) if err != nil { fndgLog.Errorf("Unable to parse signature: %v", err) resCtx.err <- err return } // Register a new barrier for this channel to properly synchronize with // the peer's readHandler once the channel is open. fmsg.peer.barrierInits <- *outPoint fndgLog.Infof("Generated ChannelPoint(%v) for pendingID(%v)", outPoint, chanID) revocationKey := resCtx.reservation.OurContribution().RevocationKey obsfucator := resCtx.reservation.StateNumObfuscator() fundingComplete := lnwire.NewSingleFundingComplete(chanID, outPoint, commitSig, revocationKey, obsfucator) sourcePeer.queueMsg(fundingComplete, nil) } // processFundingComplete queues a funding complete message coupled with the // source peer to the fundingManager. func (f *fundingManager) processFundingComplete(msg *lnwire.SingleFundingComplete, peer *peer) { f.fundingMsgs <- &fundingCompleteMsg{msg, peer} } // handleFundingComplete progresses the funding workflow when the daemon is on // the responding side of a single funder workflow. Once this message has been // processed, a signature is sent to the remote peer allowing it to broadcast // the funding transaction, progressing the workflow into the final stage. func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) { resCtx, err := f.getReservationCtx(fmsg.peer.id, fmsg.msg.ChannelID) if err != nil { fndgLog.Warnf("can' find reservation (peerID:%v, chanID:%v)", fmsg.peer.id, fmsg.msg.ChannelID) return } // The channel initiator has responded with the funding outpoint of the // final funding transaction, as well as a signature for our version of // the commitment transaction. So at this point, we can validate the // inititator's commitment transaction, then send our own if it's valid. // TODO(roasbeef): make case (p vs P) consistent throughout fundingOut := fmsg.msg.FundingOutPoint chanID := fmsg.msg.ChannelID fndgLog.Infof("completing pendingID(%v) with ChannelPoint(%v)", chanID, fundingOut, ) revokeKey := fmsg.msg.RevocationKey obsfucator := fmsg.msg.StateHintObsfucator commitSig := fmsg.msg.CommitSignature.Serialize() // With all the necessary data available, attempt to advance the // funding workflow to the next stage. If this succeeds then the // funding transaction will broadcast after our next message. err = resCtx.reservation.CompleteReservationSingle(revokeKey, fundingOut, commitSig, obsfucator) if err != nil { // TODO(roasbeef): better error logging: peerID, channelID, etc. fndgLog.Errorf("unable to complete single reservation: %v", err) fmsg.peer.Disconnect() return } // With their signature for our version of the commitment transaction // verified, we can now send over our signature to the remote peer. // TODO(roasbeef): just have raw bytes in wire msg? avoids decoding // then decoding shortly afterwards. _, sig := resCtx.reservation.OurSignatures() ourCommitSig, err := btcec.ParseSignature(sig, btcec.S256()) if err != nil { fndgLog.Errorf("unable to parse signature: %v", err) return } // Register a new barrier for this channel to properly synchronize with // the peer's readHandler once the channel is open. fmsg.peer.barrierInits <- *fundingOut fndgLog.Infof("sending signComplete for pendingID(%v) over ChannelPoint(%v)", fmsg.msg.ChannelID, fundingOut) signComplete := lnwire.NewSingleFundingSignComplete(chanID, ourCommitSig) fmsg.peer.queueMsg(signComplete, nil) } // processFundingSignComplete sends a single funding sign complete message // along with the source peer to the funding manager. func (f *fundingManager) processFundingSignComplete(msg *lnwire.SingleFundingSignComplete, peer *peer) { f.fundingMsgs <- &fundingSignCompleteMsg{msg, peer} } // handleFundingSignComplete processes the final message received in a single // funder workflow. Once this message is processed, the funding transaction is // broadcast. Once the funding transaction reaches a sufficient number of // confirmations, a message is sent to the responding peer along with an SPV // proofs of transaction inclusion. func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg) { chanID := fmsg.msg.ChannelID peerID := fmsg.peer.id resCtx, err := f.getReservationCtx(peerID, chanID) if err != nil { fndgLog.Warnf("can' find reservation (peerID:%v, chanID:%v)", peerID, chanID) return } // The remote peer has responded with a signature for our commitment // transaction. We'll verify the signature for validity, then commit // the state to disk as we can now open the channel. commitSig := fmsg.msg.CommitSignature.Serialize() if err := resCtx.reservation.CompleteReservation(nil, commitSig); err != nil { fndgLog.Errorf("unable to complete reservation sign complete: %v", err) fmsg.peer.Disconnect() resCtx.err <- err return } fundingPoint := resCtx.reservation.FundingOutpoint() fndgLog.Infof("Finalizing pendingID(%v) over ChannelPoint(%v), "+ "waiting for channel open on-chain", chanID, fundingPoint) // Send an update to the upstream client that the negotiation process // is over. // TODO(roasbeef): add abstraction over updates to accomdate // long-polling, or SSE, etc. resCtx.updates <- &lnrpc.OpenStatusUpdate{ Update: &lnrpc.OpenStatusUpdate_ChanPending{ ChanPending: &lnrpc.PendingUpdate{ Txid: fundingPoint.Hash[:], }, }, } // Spawn a goroutine which will send the newly open channel to the // source peer once the channel is open. A channel is considered "open" // once it reaches a sufficient number of confirmations. // TODO(roasbeef): semaphore to limit active chan open goroutines go func() { select { // TODO(roasbeef): need to persist pending broadcast channels, // send chan open proof during scan of blocks mined while down. case openChan := <-resCtx.reservation.DispatchChan(): // This reservation is no longer pending as the funding // transaction has been fully confirmed. f.deleteReservationCtx(peerID, chanID) fndgLog.Infof("ChannelPoint(%v) with peerID(%v) is now active", fundingPoint, peerID) // Now that the channel is open, we need to notify a // number of parties of this event. // First we send the newly opened channel to the source // server peer. fmsg.peer.newChannels <- openChan // Afterwards we send the breach arbiter the new // channel so it can watch for attempts to breach the // channel's contract by the remote party. f.breachAribter.newContracts <- openChan // Next, we queue a message to notify the remote peer // that the channel is open. We additionally provide an // SPV proof allowing them to verify the transaction // inclusion. // TODO(roasbeef): obtain SPV proof from sub-system. // * ChainNotifier constructs proof also? spvProof := []byte("fake proof") fundingOpen := lnwire.NewSingleFundingOpenProof(chanID, spvProof) fmsg.peer.queueMsg(fundingOpen, nil) // Register the new link with the L3 routing manager // so this new channel can be utilized during path // finding. chanInfo := openChan.StateSnapshot() capacity := int64(chanInfo.LocalBalance + chanInfo.RemoteBalance) pubSerialized := fmsg.peer.addr.IdentityKey.SerializeCompressed() fmsg.peer.server.routingMgr.OpenChannel( graph.NewVertex(pubSerialized), graph.NewEdgeID(*fundingPoint), &graph.ChannelInfo{ Cpt: capacity, }, ) // Finally give the caller a final update notifying // them that the channel is now open. // TODO(roasbeef): helper funcs for proto construction resCtx.updates <- &lnrpc.OpenStatusUpdate{ Update: &lnrpc.OpenStatusUpdate_ChanOpen{ ChanOpen: &lnrpc.ChannelOpenUpdate{ ChannelPoint: &lnrpc.ChannelPoint{ FundingTxid: fundingPoint.Hash[:], OutputIndex: fundingPoint.Index, }, }, }, } return case <-f.quit: return } }() } // processFundingOpenProof sends a message to the fundingManager allowing it // to process the final message recieved when the daemon is on the responding // side of a single funder channel workflow. func (f *fundingManager) processFundingOpenProof(msg *lnwire.SingleFundingOpenProof, peer *peer) { f.fundingMsgs <- &fundingOpenMsg{msg, peer} } // handleFundingOpen processes the final message when the daemon is the // responder to a single funder channel workflow. The SPV proofs supplied by // the initiating node is verified, which if correct, marks the channel as open // to the source peer. func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { chanID := fmsg.msg.ChannelID peerID := fmsg.peer.id resCtx, err := f.getReservationCtx(peerID, chanID) if err != nil { fndgLog.Warnf("can' find reservation (peerID:%v, chanID:%v)", peerID, chanID) return } // The channel initiator has claimed the channel is now open, so we'll // verify the contained SPV proof for validity. // TODO(roasbeef): send off to the spv proof verifier, in the routing // sub-module. // Now that we've verified the initiator's proof, we'll commit the // channel state to disk, and notify the source peer of a newly opened // channel. openChan, err := resCtx.reservation.FinalizeReservation() if err != nil { fndgLog.Errorf("unable to finalize reservation: %v", err) fmsg.peer.Disconnect() return } // The reservation has been completed, therefore we can stop tracking // it within our active reservations map. f.deleteReservationCtx(peerID, chanID) fndgLog.Infof("FundingOpen: ChannelPoint(%v) with peerID(%v) is now open", resCtx.reservation.FundingOutpoint, peerID) // Notify the L3 routing manager of the newly active channel link. capacity := int64(resCtx.reservation.OurContribution().FundingAmount + resCtx.reservation.TheirContribution().FundingAmount) vertex := fmsg.peer.addr.IdentityKey.SerializeCompressed() fmsg.peer.server.routingMgr.OpenChannel( graph.NewVertex(vertex), graph.NewEdgeID(*resCtx.reservation.FundingOutpoint()), &graph.ChannelInfo{ Cpt: capacity, }, ) // Send the newly opened channel to the breach arbiter to it can watch // for uncopperative channel breaches, potentially punishing the // counter-party for attempting to cheat us. f.breachAribter.newContracts <- openChan // Finally, notify the target peer of the newly open channel. fmsg.peer.newChannels <- openChan } // initFundingWorkflow sends a message to the funding manager instructing it // to initiate a single funder workflow with the source peer. // TODO(roasbeef): re-visit blocking nature.. func (f *fundingManager) initFundingWorkflow(targetPeer *peer, req *openChanReq) { f.fundingRequests <- &initFundingMsg{ peer: targetPeer, openChanReq: req, } } // handleInitFundingMsg creates a channel reservation within the daemon's // wallet, then sends a funding request to the remote peer kicking off the // funding workflow. func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { var ( // TODO(roasbeef): add delay nodeID = msg.peer.addr.IdentityKey localAmt = msg.localFundingAmt remoteAmt = msg.remoteFundingAmt capacity = localAmt + remoteAmt numConfs = msg.numConfs ourDustLimit = lnwallet.DefaultDustLimit() ) fndgLog.Infof("Initiating fundingRequest(localAmt=%v, remoteAmt=%v, "+ "capacity=%v, numConfs=%v, addr=%v, dustLimit=%v)", localAmt, remoteAmt, ourDustLimit, capacity, numConfs, msg.peer.addr.Address) // Initialize a funding reservation with the local wallet. If the // wallet doesn't have enough funds to commit to this channel, then // the request will fail, and be aborted. reservation, err := f.wallet.InitChannelReservation(capacity, localAmt, nodeID, msg.peer.addr.Address, uint16(numConfs), 4, ourDustLimit) if err != nil { msg.err <- err return } // Obtain a new pending channel ID which is used to track this // reservation throughout its lifetime. msg.peer.pendingChannelMtx.Lock() chanID := msg.peer.nextPendingChannelID msg.peer.nextPendingChannelID++ msg.peer.pendingChannelMtx.Unlock() // If a pending channel map for this peer isn't already created, then // we create one, ultimately allowing us to track this pending // reservation within the target peer. f.resMtx.Lock() if _, ok := f.activeReservations[msg.peer.id]; !ok { f.activeReservations[msg.peer.id] = make(pendingChannels) } f.activeReservations[msg.peer.id][chanID] = &reservationWithCtx{ reservation: reservation, peer: msg.peer, updates: msg.updates, err: msg.err, } f.resMtx.Unlock() // Once the reservation has been created, and indexed, queue a funding // request to the remote peer, kicking off the funding workflow. contribution := reservation.OurContribution() deliveryScript, err := txscript.PayToAddrScript(contribution.DeliveryAddress) if err != nil { fndgLog.Errorf("Unable to convert address to pkscript: %v", err) msg.err <- err return } fndgLog.Infof("Starting funding workflow with for pendingID(%v)", chanID) // TODO(roasbeef): add FundingRequestFromContribution func // TODO(roasbeef): need to set fee/kb fundingReq := lnwire.NewSingleFundingRequest( chanID, msg.channelType, msg.coinType, 0, // TODO(roasbeef): grab from fee estimation model capacity, contribution.CsvDelay, contribution.CommitKey, contribution.MultiSigKey, deliveryScript, ourDustLimit, ) msg.peer.queueMsg(fundingReq, nil) } // processErrorGeneric sends a message to the fundingManager allowing it to // process the occurred generic error. func (f *fundingManager) processErrorGeneric(err *lnwire.ErrorGeneric, peer *peer) { f.fundingMsgs <- &fundingErrorMsg{err, peer} } // handleErrorGenericMsg process the error which was received from remote peer, // depends on the type of error we should do different clean up steps and // inform user about it. func (f *fundingManager) handleErrorGenericMsg(fmsg *fundingErrorMsg) { e := fmsg.err switch e.Code { case lnwire.ErrorMaxPendingChannels: peerID := fmsg.peer.id chanID := fmsg.err.PendingChannelID if ctx, err := f.cancelReservationCtx(peerID, chanID); err != nil { fndgLog.Warnf("unable to delete reservation: %v", err) return } else { ctx.err <- grpc.Errorf(e.Code.ToGrpcCode(), e.Problem) return } default: fndgLog.Warnf("unknown funding error (%v:%v)", e.Code, e.Problem) } } // cancelReservationCtx do all needed work in order to securely cancel the // reservation. func (f *fundingManager) cancelReservationCtx(peerID int32, chanID uint64) (*reservationWithCtx, error) { ctx, err := f.getReservationCtx(peerID, chanID) if err != nil { return nil, errors.Errorf("can't find reservation: %v", err) } if err := ctx.reservation.Cancel(); err != nil { ctx.err <- err return nil, errors.Errorf("can't cancel reservation: %v", err) } f.deleteReservationCtx(peerID, chanID) return ctx, nil } // deleteReservationCtx is needed in order to securely delete the reservation. func (f *fundingManager) deleteReservationCtx(peerID int32, chanID uint64) { // TODO(roasbeef): possibly cancel funding barrier in peer's // channelManager? f.resMtx.Lock() delete(f.activeReservations[peerID], chanID) f.resMtx.Unlock() } // getReservationCtx returns the reservation context by peer id and channel id. func (f *fundingManager) getReservationCtx(peerID int32, chanID uint64) (*reservationWithCtx, error) { f.resMtx.RLock() resCtx, ok := f.activeReservations[peerID][chanID] f.resMtx.RUnlock() if !ok { return nil, errors.Errorf("unknown channel (id: %v)", chanID) } return resCtx, nil }