diff --git a/fundingmanager.go b/fundingmanager.go index b888c673..43e25672 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -1,7 +1,6 @@ package main import ( - "fmt" "sync" "sync/atomic" @@ -174,7 +173,6 @@ out: for { select { case msg := <-f.fundingMsgs: - fmt.Println("got funding msg: %v", msg) switch fmsg := msg.(type) { case *fundingRequestMsg: f.handleFundingRequest(fmsg) diff --git a/htlcswitch.go b/htlcswitch.go new file mode 100644 index 00000000..5fd300c9 --- /dev/null +++ b/htlcswitch.go @@ -0,0 +1,5 @@ +package main + +// HtlcSwitch... +type HtlcSwitch struct { +} diff --git a/lnrpc/rpc.proto b/lnrpc/rpc.proto index b2c246c4..e8249401 100644 --- a/lnrpc/rpc.proto +++ b/lnrpc/rpc.proto @@ -4,6 +4,7 @@ package lnrpc; service Lightning { rpc SendMany(SendManyRequest) returns (SendManyResponse); + rpc NewAddress(NewAddressRequest) returns (NewAddressResponse); rpc ConnectPeer(ConnectPeerRequest) returns (ConnectPeerResponse); @@ -97,6 +98,31 @@ message ListPeersRequest {} message ListPeersResponse { repeated Peer peers = 1; } + +message OpenChannelRequest { + int32 target_peer_id = 1; + LightningAddress target_node = 2; + + int64 local_funding_amount = 3; + int64 remote_funding_amount = 4; + + int64 commission_size = 5; + + uint32 num_confs = 6; +} +message OpenChannelResponse { + ChannelPoint channel_point = 1; +} + +message CloseChannelRequest { + ChannelPoint channel_point = 1; + int64 time_limit = 2; + bool allow_force_close = 3; +} +message CloseChannelResponse { + bool success = 1; +} + message WalletBalanceRequest { bool witness_only = 1; } diff --git a/peer.go b/peer.go index 142b381a..733a7f88 100644 --- a/peer.go +++ b/peer.go @@ -7,9 +7,14 @@ import ( "sync/atomic" "time" + "github.com/btcsuite/fastsha256" + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lndc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/btcec" + "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" ) @@ -17,78 +22,129 @@ var ( numNodes int32 ) -// channelState... -type channelState uint8 - const ( - // TODO(roasbeef): others?? - channelPending channelState = iota - channelOpen - channelClosed - channelDispute - channelPendingPayment -) - -const ( - numAllowedRetransmits = 5 - pingInterval = 1 * time.Minute + // pingInterval is the interval at which ping messages are sent. + pingInterval = 30 * time.Second + // outgoingQueueLen is the buffer size of the channel which houses + // messages to be sent across the wire, requested by objects outside + // this struct. outgoingQueueLen = 50 ) -// outgoinMsg... +// outgoinMsg packages an lnwire.Message to be sent out on the wire, along with +// a buffered channel which will be sent upon once the write is complete. This +// buffered channel acts as a semaphore to be used for synchronization purposes. type outgoinMsg struct { msg lnwire.Message - sentChan chan struct{} + sentChan chan struct{} // MUST be buffered. } -// peer... -// inspired by btcd/peer.go +// peer is an active peer on the Lightning Network. This struct is responsible +// for managing any channel state related to this peer. To do so, it has several +// helper goroutines to handle events such as HTLC timeouts, new funding +// workflow, and detecting an uncooperative closure of any active channels. type peer struct { - // only to be used atomically + // MUST be used atomically. started int32 connected int32 disconnect int32 conn net.Conn - lightningAddr lndc.LNAdr + lightningAddr *lndc.LNAdr + lightningID wire.ShaHash + inbound bool protocolVersion uint32 - peerId int32 + id int32 // For purposes of detecting retransmits, etc. lastNMessages map[lnwire.Message]struct{} + // This mutex protects all the stats below it. sync.RWMutex - timeConnected time.Time - lastSend time.Time - lastRecv time.Time + timeConnected time.Time + lastSend time.Time + lastRecv time.Time + + // The following fields are only meant to be used *atomically* bytesReceived uint64 bytesSent uint64 satoshisSent uint64 satoshisReceived uint64 - // TODO(roasbeef): pings?? - sendQueueSync chan struct{} + // chainNet is the Bitcoin network to which this peer is anchored to. + chainNet wire.BitcoinNet + + // sendQueue is the channel which is used to queue outgoing to be + // written onto the wire. Note that this channel is unbuffered. + sendQueue chan outgoinMsg + + // outgoingQueue is a buffered channel which allows second/third party + // objects to queue messages to be sent out on the wire. outgoingQueue chan outgoinMsg - sendQueue chan outgoinMsg - // Only will be set if the channel is in the 'pending' state. - reservation *lnwallet.ChannelReservation + // sendQueueSync is used as a semaphore to synchronize writes between + // the writeHandler and the queueHandler. + sendQueueSync chan struct{} - lnChannel *lnwallet.LightningChannel + // activeChannels is a map which stores the state machines of all + // active channels. Channels are indexed into the map by the txid of + // the funding transaction which opened the channel. + activeChannels map[wire.OutPoint]*lnwallet.LightningChannel + + // newChanBarriers is a map from a channel point to a 'barrier' which + // will be signalled once the channel is fully open. This barrier acts + // as a synchronization point for any incoming/outgoing HTLCs before + // the channel has been fully opened. + // TODO(roasbeef): barrier to sync chan open and handling of first htlc + // message. + newChanBarriers map[wire.OutPoint]chan struct{} + + // newChannels is used by the fundingManager to send fully opened + // channels to the source peer which handled the funding workflow. + // TODO(roasbeef): barrier to block until chan open before update + newChannels chan *lnwallet.LightningChannel + + // localCloseChanReqs is a channel in which any local requests to + // close a particular channel are sent over. + localCloseChanReqs chan *closeChanReq + + // remoteCloseChanReqs is a channel in which any remote requests + // (initiated by the remote peer) close a particular channel are sent + // over. + remoteCloseChanReqs chan *lnwire.CloseRequest + + // nextPendingChannelID is an integer which represents the id of the + // next pending channel. Pending channels are tracked by this id + // throughout their lifetime until they become active channels, or are + // cancelled. Channels id's initiated by an outbound node start from 0, + // while channels inititaed by an inbound node start from 2^63. In + // either case, this value is always monotonically increasing. + nextPendingChannelID uint64 + pendingChannelMtx sync.RWMutex + + server *server queueQuit chan struct{} quit chan struct{} wg sync.WaitGroup } -// newPeer... -func newPeer(conn net.Conn, server *server) *peer { - return &peer{ - conn: conn, - peerId: atomic.AddInt32(&numNodes, 1), +// newPeer creates a new peer from an establish connection object, and a +// pointer to the main server. +func newPeer(conn net.Conn, server *server, net wire.BitcoinNet, inbound bool) (*peer, error) { + nodePub := conn.(*lndc.LNDConn).RemotePub + + p := &peer{ + conn: conn, + lightningID: wire.ShaHash(fastsha256.Sum256(nodePub.SerializeCompressed())), + id: atomic.AddInt32(&numNodes, 1), + chainNet: net, + inbound: inbound, + + server: server, lastNMessages: make(map[lnwire.Message]struct{}), @@ -96,26 +152,91 @@ func newPeer(conn net.Conn, server *server) *peer { sendQueue: make(chan outgoinMsg, 1), outgoingQueue: make(chan outgoinMsg, outgoingQueueLen), + newChanBarriers: make(map[wire.OutPoint]chan struct{}), + activeChannels: make(map[wire.OutPoint]*lnwallet.LightningChannel), + newChannels: make(chan *lnwallet.LightningChannel, 1), + + localCloseChanReqs: make(chan *closeChanReq), + remoteCloseChanReqs: make(chan *lnwire.CloseRequest), + queueQuit: make(chan struct{}), quit: make(chan struct{}), } + + // Initiate the pending channel identifier properly depending on if this + // node is inbound or outbound. This value will be used in an increasing + // manner to track pending channels. + if inbound { + p.nextPendingChannelID = 1 << 63 + } else { + p.nextPendingChannelID = 0 + } + + // Fetch and then load all the active channels we have with this + // remote peer from the database. + activeChans, err := server.chanDB.FetchOpenChannels(&p.lightningID) + if err != nil { + peerLog.Errorf("unable to fetch active chans "+ + "for peer %v: %v", p, err) + return nil, err + } + if err := p.loadActiveChannels(activeChans); err != nil { + return nil, err + } + + return p, nil } +// loadActiveChannels creates indexes within the peer for tracking all active +// channels returned by the database. +func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { + for _, dbChan := range chans { + chanID := dbChan.ChanID + lnChan, err := lnwallet.NewLightningChannel(p.server.lnwallet, + p.server.lnwallet.ChainNotifier, p.server.chanDB, dbChan) + if err != nil { + return err + } + + chanPoint := wire.OutPoint{ + Hash: chanID.Hash, + Index: chanID.Index, + } + p.activeChannels[chanPoint] = lnChan + peerLog.Infof("peerID(%v) loaded ChannelPoint(%v)", p.id, chanPoint) + + // Update the server's global channel index. + p.server.chanIndexMtx.Lock() + p.server.chanIndex[chanPoint] = p + p.server.chanIndexMtx.Unlock() + } + + return nil +} + +// Start starts all helper goroutines the peer needs for normal operations. +// In the case this peer has already beeen started, then this function is a +// noop. func (p *peer) Start() error { if atomic.AddInt32(&p.started, 1) != 1 { return nil } - // TODO(roasbeef): version handshake + peerLog.Tracef("peer %v starting", p) - p.wg.Add(3) - go p.inHandler() + p.wg.Add(5) + go p.readHandler() go p.queueHandler() - go p.outHandler() + go p.writeHandler() + go p.channelManager() + go p.htlcManager() return nil } +// Stop signals the peer for a graceful shutdown. All active goroutines will be +// signaled to wrap up any final actions. This function will also block until +// all goroutines have exited. func (p *peer) Stop() error { // If we're already disconnecting, just exit. if atomic.AddInt32(&p.disconnect, 1) != 1 { @@ -129,23 +250,39 @@ func (p *peer) Stop() error { // Signal all worker goroutines to gracefully exit. close(p.quit) + p.wg.Wait() return nil } -// readNextMessage... +// String returns the string representation of this peer. +func (p *peer) String() string { + return p.conn.RemoteAddr().String() +} + +// readNextMessage reads, and returns the next message on the wire along with +// any additional raw payload. func (p *peer) readNextMessage() (lnwire.Message, []byte, error) { // TODO(roasbeef): use our own net magic? - _, nextMsg, rawPayload, err := lnwire.ReadMessage(p.conn, 0, wire.TestNet) + n, nextMsg, rawPayload, err := lnwire.ReadMessage(p.conn, 0, p.chainNet) + atomic.AddUint64(&p.bytesReceived, uint64(n)) if err != nil { return nil, nil, err } + // TODO(roasbeef): add message summaries + peerLog.Tracef("readMessage from %v: %v", p, newLogClosure(func() string { + return spew.Sdump(nextMsg) + })) + return nextMsg, rawPayload, nil } -// inHandler.. -func (p *peer) inHandler() { +// readHandler is responsible for reading messages off the wire in series, then +// properly dispatching the handling of the message to the proper sub-system. +// +// NOTE: This method MUST be run as a goroutine. +func (p *peer) readHandler() { // TODO(roasbeef): set timeout for initial channel request or version // exchange. @@ -153,34 +290,54 @@ out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, _, err := p.readNextMessage() if err != nil { - // TODO(roasbeef): log error + peerLog.Infof("unable to read message: %v", err) break out } - // TODO(roasbeef): state-machine to track version exchange switch msg := nextMsg.(type) { - // TODO(roasbeef): cases + // TODO(roasbeef): consolidate into predicate (single vs dual) + case *lnwire.SingleFundingRequest: + p.server.fundingMgr.processFundingRequest(msg, p) + case *lnwire.SingleFundingResponse: + p.server.fundingMgr.processFundingResponse(msg, p) + case *lnwire.SingleFundingComplete: + p.server.fundingMgr.processFundingComplete(msg, p) + case *lnwire.SingleFundingSignComplete: + p.server.fundingMgr.processFundingSignComplete(msg, p) + case *lnwire.SingleFundingOpenProof: + p.server.fundingMgr.processFundingOpenProof(msg, p) + case *lnwire.CloseRequest: + p.remoteCloseChanReqs <- msg } } p.wg.Done() } -// writeMessage... +// writeMessage writes the target lnwire.Message to the remote peer. func (p *peer) writeMessage(msg lnwire.Message) error { // Simply exit if we're shutting down. if atomic.LoadInt32(&p.disconnect) != 0 { return nil } - _, err := lnwire.WriteMessage(p.conn, msg, 0, - wire.TestNet) + // TODO(roasbeef): add message summaries + peerLog.Tracef("writeMessage to %v: %v", p, newLogClosure(func() string { + return spew.Sdump(msg) + })) + + n, err := lnwire.WriteMessage(p.conn, msg, 0, p.chainNet) + atomic.AddUint64(&p.bytesSent, uint64(n)) return err } -// outHandler.. -func (p *peer) outHandler() { +// writeHandler is a goroutine dedicated to reading messages off of an incoming +// queue, and writing them out to the wire. This goroutine coordinates with the +// queueHandler in order to ensure the incoming message queue is quickly drained. +// +// NOTE: This method MUST be run as a goroutine. +func (p *peer) writeHandler() { // pingTicker is used to periodically send pings to the remote peer. pingTicker := time.NewTicker(pingInterval) defer pingTicker.Stop() @@ -195,15 +352,15 @@ out: if err := p.writeMessage(outMsg.msg); err != nil { // TODO(roasbeef): disconnect + peerLog.Errorf("unable to write message: %v", err) } - // Synchronize with the outHandler. + // Synchronize with the writeHandler. p.sendQueueSync <- struct{}{} case <-pingTicker.C: - // TODO(roasbeef): ping em + // TODO(roasbeef): move ping to time.AfterFunc case <-p.quit: break out - } } @@ -227,7 +384,10 @@ fin: p.wg.Done() } -// queueHandler.. +// queueHandler is responsible for accepting messages from outside sub-systems +// to be eventually sent out on the wire by the writeHandler. +// +// NOTE: This method MUST be run as a goroutine. func (p *peer) queueHandler() { waitOnSync := false pendingMsgs := list.New() @@ -244,14 +404,14 @@ out: case <-p.sendQueueSync: // If there aren't any more remaining messages in the // queue, then we're no longer waiting to synchronize - // with the outHandler. + // with the writeHandler. next := pendingMsgs.Front() if next == nil { waitOnSync = false continue } - // Notify the outHandler about the next item to + // Notify the writeHandler about the next item to // asynchronously send. val := pendingMsgs.Remove(next) p.sendQueue <- val.(outgoinMsg) @@ -264,3 +424,187 @@ out: close(p.queueQuit) p.wg.Done() } + +// queueMsg queues a new lnwire.Message to be eventually sent out on the +// wire. +func (p *peer) queueMsg(msg lnwire.Message, doneChan chan struct{}) { + p.outgoingQueue <- outgoinMsg{msg, doneChan} +} + +// channelManager is goroutine dedicated to handling all requests/signals +// pertaining to the opening, cooperative closing, and force closing of all +// channels maintained with the remote peer. +// +// NOTE: This method MUST be run as a goroutine. +func (p *peer) channelManager() { +out: + for { + select { + case newChan := <-p.newChannels: + chanPoint := newChan.ChannelPoint() + p.activeChannels[chanPoint] = newChan + + // TODO(roasbeef): signal channel barrier + peerLog.Infof("New channel active ChannelPoint(%v) "+ + "with peerId(%v)", chanPoint, p.id) + + // Now that the channel is open, update the server's + // map of channels to the peers we have a particular + // channel open to. + // TODO(roasbeef): should server have this knowledge? + p.server.chanIndexMtx.Lock() + p.server.chanIndex[chanPoint] = p + p.server.chanIndexMtx.Unlock() + case req := <-p.localCloseChanReqs: + p.handleLocalClose(req) + case req := <-p.remoteCloseChanReqs: + p.handleRemoteClose(req) + case <-p.quit: + break out + } + } + + p.wg.Done() +} + +// handleLocalClose kicks-off the workflow to execute a cooperative closure of +// the channel initiated by a local sub-system. +func (p *peer) handleLocalClose(req *closeChanReq) { + chanPoint := req.chanPoint + key := wire.OutPoint{ + Hash: chanPoint.Hash, + Index: chanPoint.Index, + } + channel := p.activeChannels[key] + + // Shift the channel state machine into a 'closing' state. This + // generates a signature for the closing tx, as well as a txid of the + // closing tx itself, allowing us to watch the network to determine + // when the remote node broadcasts the fully signed closing transaction. + sig, txid, err := channel.InitCooperativeClose() + if err != nil { + req.resp <- nil + req.err <- err + return + } + peerLog.Infof("Executing cooperative closure of "+ + "ChanPoint(%v) with %v, txid=%v", key, p.id, + txid) + + // With our signature for the close tx generated, send the signature + // to the remote peer instructing it to close this particular channel + // point. + // TODO(roasbeef): remove encoding redundancy + closeSig, err := btcec.ParseSignature(sig, btcec.S256()) + if err != nil { + req.resp <- nil + req.err <- err + return + } + closeReq := lnwire.NewCloseRequest(chanPoint, closeSig) + p.queueMsg(closeReq, nil) + + // Finally, launch a goroutine which will request to be notified by the + // ChainNotifier once the closure transaction obtains a single + // confirmation. + go func() { + // TODO(roasbeef): add param for num needed confs + notifier := p.server.lnwallet.ChainNotifier + confNtfn, _ := notifier.RegisterConfirmationsNtfn(txid, 1) + + var success bool + select { + case height, ok := <-confNtfn.Confirmed: + // In the case that the ChainNotifier is shutting + // down, all subscriber notification channels will be + // closed, generating a nil receive. + if !ok { + // TODO(roasbeef): check for nil elsewhere + return + } + + // The channel has been closed, remove it from any + // active indexes, and the database state. + peerLog.Infof("ChannelPoint(%v) is now "+ + "closed at height %v", key, height) + delete(p.activeChannels, key) + + p.server.chanIndexMtx.Lock() + delete(p.server.chanIndex, key) + p.server.chanIndexMtx.Unlock() + // TODO(roasbeef): wipe from DB + success = true + case <-p.quit: + } + + // Respond to the local sub-system which requested the channel + // closure. + req.resp <- &closeChanResp{success} + req.err <- nil + }() +} + +// handleRemoteClose completes a request for cooperative channel closure +// initiated by the remote node. +func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) { + chanPoint := req.ChannelPoint + key := wire.OutPoint{ + Hash: chanPoint.Hash, + Index: chanPoint.Index, + } + channel := p.activeChannels[key] + + // Now that we have their signature for the closure transaction, we + // can assemble the final closure transaction, complete with our + // signature. + sig := req.RequesterCloseSig + closeSig := append(sig.Serialize(), byte(txscript.SigHashAll)) + closeTx, err := channel.CompleteCooperativeClose(closeSig) + if err != nil { + peerLog.Errorf("unable to complete cooperative "+ + "close for ChannelPoint(%v): %v", + chanPoint, err) + // TODO(roasbeef): send ErrorGeneric to other side + return + } + + // Finally, broadcast the closure transaction, to the network. + peerLog.Infof("Broadcasting cooperative close tx: %v", newLogClosure(func() string { + return spew.Sdump(closeTx) + })) + if err := p.server.lnwallet.PublishTransaction(closeTx); err != nil { + peerLog.Errorf("channel close tx from "+ + "ChannelPoint(%v) rejected: %v", + chanPoint, err) + // TODO(roasbeef): send ErrorGeneric to other side + return + } + + // TODO(roasbeef): also wait for confs before removing state + peerLog.Infof("ChannelPoint(%v) is now "+ + "closed", key) + delete(p.activeChannels, key) + + p.server.chanIndexMtx.Lock() + delete(p.server.chanIndex, key) + p.server.chanIndexMtx.Unlock() + // TODO(roasbeef): wipe from DB, with above in func +} + +// htlcManager... +// * communicates with the htlc switch over several channels +// * in handler sends to this goroutine after getting final revocation +// * has timeouts etc, to send back on queue handler in case of timeout +func (p *peer) htlcManager() { +out: + for { + select { + case <-p.quit: + break out + } + } + + p.wg.Done() +} + +// TODO(roasbeef): make all start/stop mutexes a CAS diff --git a/rpcserver.go b/rpcserver.go index ea536cc4..521d90bd 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -149,16 +149,66 @@ func (r *rpcServer) ConnectPeer(ctx context.Context, rpcsLog.Debugf("Connected to peer: %v", peerAddr.String()) return &lnrpc.ConnectPeerResponse{peerID}, nil } + +// OpenChannel attempts to open a singly funded channel specified in the +// request to a remote peer. +func (r *rpcServer) OpenChannel(ctx context.Context, + in *lnrpc.OpenChannelRequest) (*lnrpc.OpenChannelResponse, error) { + + rpcsLog.Tracef("Recieved request to openchannel to peerid(%v) "+ + "allocation(us=%v, them=%v) numconfs=%v", in.TargetPeerId, + in.LocalFundingAmount, in.RemoteFundingAmount, in.NumConfs) + + localFundingAmt := btcutil.Amount(in.LocalFundingAmount) + remoteFundingAmt := btcutil.Amount(in.RemoteFundingAmount) + target := in.TargetPeerId + numConfs := in.NumConfs + resp, err := r.server.OpenChannel(target, localFundingAmt, + remoteFundingAmt, numConfs) if err != nil { + rpcsLog.Errorf("unable to open channel to peerid(%v): %v", + target, err) return nil, err } - if err := r.server.ConnectToPeer(peerAddr); err != nil { + rpcsLog.Tracef("Opened channel with peerid(%v), fundingtxid %v", + in.TargetPeerId, resp) + + return &lnrpc.OpenChannelResponse{ + &lnrpc.ChannelPoint{ + FundingTxid: resp.Hash[:], + OutputIndex: resp.Index, + }, + }, nil +} + +// CloseChannel attempts to close an active channel identified by its channel +// point. The actions of this method can additionally be augmented to attempt +// a force close after a timeout period in the case of an inactive peer. +func (r *rpcServer) CloseChannel(ctx context.Context, + in *lnrpc.CloseChannelRequest) (*lnrpc.CloseChannelResponse, error) { + + index := in.ChannelPoint.OutputIndex + txid, err := wire.NewShaHash(in.ChannelPoint.FundingTxid) + if err != nil { + rpcsLog.Errorf("(closechannel) invalid txid: %v", err) + return nil, err + } + targetChannelPoint := wire.NewOutPoint(txid, index) + + rpcsLog.Tracef("Recieved closechannel request for ChannelPoint(%v)", + targetChannelPoint) + + resp, err := r.server.CloseChannel(targetChannelPoint) + if err != nil { + rpcsLog.Errorf("Unable to close ChannelPoint(%v): %v", + targetChannelPoint, err) return nil, err } - rpcsLog.Infof("Connected to peer: %v", peerAddr.String()) - return &lnrpc.ConnectPeerResponse{[]byte(peerAddr.String())}, nil + return &lnrpc.CloseChannelResponse{resp}, nil +} + // ListPeers returns a verbose listing of all currently active peers. func (r *rpcServer) ListPeers(ctx context.Context, in *lnrpc.ListPeersRequest) (*lnrpc.ListPeersResponse, error) { diff --git a/server.go b/server.go index d71c4547..22b6a140 100644 --- a/server.go +++ b/server.go @@ -210,6 +210,48 @@ type connectPeerMsg struct { type listPeersMsg struct { resp chan []*peer } + +// openChanReq is a message sent to the server in order to request the +// initiation of a channel funding workflow to the peer with the specified +// node ID. +type openChanReq struct { + targetNodeID int32 + targetNode *lndc.LNAdr + + // TODO(roasbeef): make enums in lnwire + channelType uint8 + coinType uint64 + + localFundingAmt btcutil.Amount + remoteFundingAmt btcutil.Amount + + numConfs uint32 + + resp chan *openChanResp + err chan error +} + +// openChanResp is the response to an openChanReq, it contains the channel +// point, or outpoint of the broadcast funding transaction. +type openChanResp struct { + chanPoint *wire.OutPoint +} + +// closeChanReq represents a request to close a particular channel specified +// by its outpoint. +type closeChanReq struct { + chanPoint *wire.OutPoint + + resp chan *closeChanResp + err chan error +} + +// closeChanResp is the response to a closeChanReq is simply houses a boolean +// value indicating if the channel coopertive channel closure was succesful or not. +type closeChanResp struct { + success bool +} + // queryHandler is a a goroutine dedicated to handling an queries or requests // to mutate the server's global state. // @@ -226,6 +268,10 @@ out: s.handleConnectPeer(msg) case *listPeersMsg: s.handleListPeers(msg) + case *openChanReq: + s.handleOpenChanReq(msg) + case *closeChanReq: + s.handleCloseChanReq(msg) } case <-s.quit: break out @@ -235,7 +281,8 @@ out: s.wg.Done() } -// handleListPeers... +// handleListPeers sends a lice of all currently active peers to the original +// caller. func (s *server) handleListPeers(msg *listPeersMsg) { peers := make([]*peer, 0, len(s.peers)) for _, peer := range s.peers { @@ -312,22 +359,118 @@ func (s *server) handleConnectPeer(msg *connectPeerMsg) { msg.err <- nil }() } + +// handleOpenChanReq first locates the target peer, and if found hands off the +// request to the funding manager allowing it to initiate the channel funding +// workflow. +func (s *server) handleOpenChanReq(req *openChanReq) { + // First attempt to locate the target peer to open a channel with, if + // we're unable to locate the peer then this request will fail. + target := req.targetNodeID + var targetPeer *peer + for _, peer := range s.peers { // TODO(roasbeef): threadsafe api + // We found the the target + if target == peer.id { + targetPeer = peer + break + } } -} - + if targetPeer == nil { + req.resp <- nil + req.err <- fmt.Errorf("unable to find peer %v", target) return } - } + // Spawn a goroutine to send the funding workflow request to the funding + // manager. This allows the server to continue handling queries instead of + // blocking on this request which is exporeted as a synchronous request to + // the outside world. + go func() { + // TODO(roasbeef): server semaphore to restrict num goroutines + fundingID, err := s.fundingMgr.initFundingWorkflow(targetPeer, req) + req.resp <- &openChanResp{fundingID} + req.err <- err + }() } +// handleCloseChanReq sends a message to the peer responsible for the target +// channel point, instructing it to initiate a cooperative channel closure. +func (s *server) handleCloseChanReq(req *closeChanReq) { + s.chanIndexMtx.RLock() + key := wire.OutPoint{ + Hash: req.chanPoint.Hash, + Index: req.chanPoint.Index, + } + targetPeer, ok := s.chanIndex[key] + s.chanIndexMtx.RUnlock() + + if !ok { + req.resp <- nil + req.err <- fmt.Errorf("channel point %v not found", key) + return } - for _, listener := range s.listeners { + targetPeer.localCloseChanReqs <- req +} + +// ConnectToPeer requests that the server connect to a Lightning Network peer +// at the specified address. This function will *block* until either a +// connection is established, or the initial handshake process fails. +func (s *server) ConnectToPeer(addr *lndc.LNAdr) (int32, error) { + reply := make(chan int32, 1) + errChan := make(chan error, 1) + + s.queries <- &connectPeerMsg{addr, reply, errChan} + + return <-reply, <-errChan +} + +// OpenChannel sends a request to the server to open a channel to the specified +// peer identified by ID with the passed channel funding paramters. +func (s *server) OpenChannel(nodeID int32, localAmt, remoteAmt btcutil.Amount, + numConfs uint32) (*wire.OutPoint, error) { + + errChan := make(chan error, 1) + respChan := make(chan *openChanResp, 1) + + s.queries <- &openChanReq{ + targetNodeID: nodeID, + localFundingAmt: localAmt, + remoteFundingAmt: remoteAmt, + numConfs: numConfs, + + resp: respChan, + err: errChan, } + if err := <-errChan; err != nil { + return nil, err + } + + return (<-respChan).chanPoint, nil +} + +// CloseChannel attempts to close the channel identified by the specified +// outpoint in a coopertaive manner. +func (s *server) CloseChannel(channelPoint *wire.OutPoint) (bool, error) { + errChan := make(chan error, 1) + respChan := make(chan *closeChanResp, 1) + + s.queries <- &closeChanReq{ + chanPoint: channelPoint, + + resp: respChan, + err: errChan, + } + + if err := <-errChan; err != nil { + return false, err + } + + return (<-respChan).success, nil +} // Peers returns a slice of all active peers. func (s *server) Peers() []*peer {