diff --git a/fundingmanager.go b/fundingmanager.go index 55452a48..17007270 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -9,6 +9,7 @@ import ( "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" ) const ( @@ -26,6 +27,7 @@ const ( // * deadlines, etc. type reservationWithCtx struct { reservation *lnwallet.ChannelReservation + peer *peer resp chan *wire.OutPoint err chan error @@ -184,6 +186,30 @@ func (f *fundingManager) NumPendingChannels() uint32 { return <-resp } +type pendingChannel struct { + peerId int32 + lightningID [32]byte + channelPoint *wire.OutPoint + capacity btcutil.Amount + localBalance btcutil.Amount + remoteBalance btcutil.Amount +} + +type pendingChansReq struct { + resp chan []*pendingChannel +} + +// PendingChannels returns a slice describing all the channels which are +// currently pending at the last state of the funding workflow. +func (f *fundingManager) PendingChannels() []*pendingChannel { + resp := make(chan []*pendingChannel, 1) + + req := &pendingChansReq{resp} + f.queries <- req + + return <-resp +} + // reservationCoordinator is the primary goroutine tasked with progressing the // funding workflow between the wallet, and any outside peers or local callers. // @@ -210,11 +236,9 @@ out: case req := <-f.queries: switch msg := req.(type) { case *numPendingReq: - var numPending uint32 - for _, peerChannels := range f.activeReservations { - numPending += uint32(len(peerChannels)) - } - msg.resp <- numPending + f.handleNumPending(msg) + case *pendingChansReq: + f.handlePendingChannels(msg) } case <-f.quit: break out @@ -224,6 +248,41 @@ out: f.wg.Done() } +// handleNumPending handles a request for the total number of pending channels. +func (f *fundingManager) handleNumPending(msg *numPendingReq) { + var numPending uint32 + for _, peerChannels := range f.activeReservations { + numPending += uint32(len(peerChannels)) + } + msg.resp <- numPending +} + +// handlePendingChannels responds to a request for details concerning all +// currently pending channels waiting for the final phase of the funding +// workflow (funding txn confirmation). +func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) { + var pendingChannels []*pendingChannel + for peerID, peerChannels := range f.activeReservations { + for _, pendingChan := range peerChannels { + peer := pendingChan.peer + res := pendingChan.reservation + localFund := res.OurContribution().FundingAmount + remoteFund := res.TheirContribution().FundingAmount + + pendingChan := &pendingChannel{ + peerId: peerID, + lightningID: peer.lightningID, + channelPoint: res.FundingOutpoint(), + capacity: localFund + remoteFund, + localBalance: localFund, + remoteBalance: remoteFund, + } + pendingChannels = append(pendingChannels, pendingChan) + } + } + msg.resp <- pendingChannels +} + // processFundingRequest sends a message to the fundingManager allowing it to // intiate the new funding workflow with the source peer. func (f *fundingManager) processFundingRequest(msg *lnwire.SingleFundingRequest, peer *peer) { @@ -233,7 +292,7 @@ func (f *fundingManager) processFundingRequest(msg *lnwire.SingleFundingRequest, // handleSingleFundingRequest creates an initial 'ChannelReservation' within // the wallet, then responds to the source peer with a single funder response // message progressing the funding workflow. -// TODO(roasbeef): add erorr chan to all, let channelManager handle +// TODO(roasbeef): add error chan to all, let channelManager handle // error+propagate func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) { msg := fmsg.msg @@ -264,6 +323,7 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) { } f.activeReservations[fmsg.peer.id][msg.ChannelID] = &reservationWithCtx{ reservation: reservation, + peer: fmsg.peer, } f.resMtx.Unlock() @@ -452,12 +512,6 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg) return } - // This reservation is no longer pending as the funding transaction has been - // broadcast, so we can now delete it. - f.resMtx.Lock() - delete(f.activeReservations[fmsg.peer.id], chanID) - f.resMtx.Unlock() - fundingPoint := resCtx.reservation.FundingOutpoint() fndgLog.Infof("Finalizing pendingID(%v) over ChannelPoint(%v), "+ "waiting for channel open on-chain", chanID, fundingPoint) @@ -468,7 +522,16 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg) go func() { // TODO(roasbeef): semaphore to limit active chan open goroutines select { + // TODO(roasbeef): need to persist pending broadcast channels, + // send chan open proof during scan of blocks mined while down. case openChan := <-resCtx.reservation.DispatchChan(): + + // This reservation is no longer pending as the funding + // transaction has been fully confirmed. + f.resMtx.Lock() + delete(f.activeReservations[fmsg.peer.id], chanID) + f.resMtx.Unlock() + fndgLog.Infof("ChannelPoint(%v) with peerID(%v) is now active", fundingPoint, fmsg.peer.id) @@ -599,6 +662,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { } f.activeReservations[msg.peer.id][chanID] = &reservationWithCtx{ reservation: reservation, + peer: msg.peer, err: msg.err, resp: msg.resp, } diff --git a/rpcserver.go b/rpcserver.go index fba8d357..13e06c43 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -381,3 +381,40 @@ func (r *rpcServer) WalletBalance(ctx context.Context, return &lnrpc.WalletBalanceResponse{balance}, nil } + +// PendingChannels returns a list of all the channels that are currently +// considered "pending". A channel is pending if it has finished the funding +// workflow and is waiting for confirmations for the funding txn, or is in the +// process of closure, either initiated cooperatively or non-coopertively. +func (r *rpcServer) PendingChannels(ctx context.Context, + in *lnrpc.PendingChannelRequest) (*lnrpc.PendingChannelResponse, error) { + + both := in.Status == lnrpc.ChannelStatus_ALL + includeOpen := (in.Status == lnrpc.ChannelStatus_OPENING) || both + includeClose := (in.Status == lnrpc.ChannelStatus_CLOSING) || both + rpcsLog.Debugf("[pendingchannels] %v", in.Status) + + var pendingChannels []*lnrpc.PendingChannelResponse_PendingChannel + if includeOpen { + pendingOpenChans := r.server.fundingMgr.PendingChannels() + for _, pendingOpen := range pendingOpenChans { + // TODO(roasbeef): add confirmation progress + pendingChan := &lnrpc.PendingChannelResponse_PendingChannel{ + PeerId: pendingOpen.peerId, + LightningId: hex.EncodeToString(pendingOpen.lightningID[:]), + ChannelPoint: pendingOpen.channelPoint.String(), + Capacity: int64(pendingOpen.capacity), + LocalBalance: int64(pendingOpen.localBalance), + RemoteBalance: int64(pendingOpen.remoteBalance), + Status: lnrpc.ChannelStatus_OPENING, + } + pendingChannels = append(pendingChannels, pendingChan) + } + } + if includeClose { + } + + return &lnrpc.PendingChannelResponse{ + PendingChannels: pendingChannels, + }, nil +}