diff --git a/discovery/service.go b/discovery/service.go index 35ca2fdd..d71a237c 100644 --- a/discovery/service.go +++ b/discovery/service.go @@ -5,6 +5,8 @@ import ( "sync/atomic" "time" + "bytes" + "encoding/hex" "github.com/go-errors/errors" @@ -16,12 +18,23 @@ import ( "github.com/roasbeef/btcutil" ) +// waitingProofKey is the proof key which uniquely identifies the +// announcement signature message. The goal of this key is distinguish the +// local and remote proof for the same channel id. +// TODO(andrew.shvv) move to the channeldb package after waiting proof map +// becomes persistent. +type waitingProofKey struct { + chanID uint64 + isRemote bool +} + // networkMsg couples a routing related wire message with the peer that // originally sent it. type networkMsg struct { msg lnwire.Message isRemote bool peer *btcec.PublicKey + err chan error } // syncRequest represents a request from an outside subsystem to the wallet to @@ -51,10 +64,19 @@ type Config struct { // indicates that the target peer should be excluded from the broadcast. Broadcast func(exclude *btcec.PublicKey, msg ...lnwire.Message) error - // SendMessages is a function which allows the service to send a set of + // SendToPeer is a function which allows the service to send a set of // messages to a particular peer identified by the target public // key. - SendMessages func(target *btcec.PublicKey, msg ...lnwire.Message) error + SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error + + // ProofMatureDelta the number of confirmations which is needed + // before exchange the channel announcement proofs. + ProofMatureDelta uint32 + + // TrickleDelay the period of trickle timer which flushing to the + // network the pending batch of new announcements we've received since + // the last trickle tick. + TrickleDelay time.Duration } // New create new discovery service structure. @@ -79,6 +101,7 @@ func New(cfg Config) (*Discovery, error) { quit: make(chan bool), syncRequests: make(chan *syncRequest), prematureAnnouncements: make(map[uint32][]*networkMsg), + waitingProofs: make(map[waitingProofKey]*lnwire.AnnounceSignatures), fakeSig: fakeSig, }, nil } @@ -104,16 +127,23 @@ type Discovery struct { // the main chain are sent over. newBlocks <-chan *chainntnfs.BlockEpoch - // prematureAnnouncements maps a blockheight to a set of announcements - // which are "premature" from our PoV. An message is premature if - // it claims to be anchored in a block which is beyond the current main - // chain tip as we know it. Premature network messages will be processed - // once the chain tip as we know it extends to/past the premature - // height. + // prematureAnnouncements maps a block height to a set of network + // messages which are "premature" from our PoV. An message is premature + // if it claims to be anchored in a block which is beyond the current + // main chain tip as we know it. Premature network messages will be + // processed once the chain tip as we know it extends to/past the + // premature height. // // TODO(roasbeef): limit premature networkMsgs to N prematureAnnouncements map[uint32][]*networkMsg + // waitingProofs is the map of proof announcement messages which were + // processed and waiting for opposite local or remote proof to be + // received in order to construct full proof, validate it and + // announce the channel. + // TODO(andrew.shvv) make this map persistent. + waitingProofs map[waitingProofKey]*lnwire.AnnounceSignatures + // networkMsgs is a channel that carries new network broadcasted // message from outside the discovery service to be processed by the // networkHandler. @@ -137,20 +167,22 @@ type Discovery struct { // Remote channel announcements should contain the announcement proof and be // fully validated. func (d *Discovery) ProcessRemoteAnnouncement(msg lnwire.Message, - src *btcec.PublicKey) error { + src *btcec.PublicKey) chan error { - aMsg := &networkMsg{ + nMsg := &networkMsg{ msg: msg, isRemote: true, peer: src, + err: make(chan error, 1), } select { - case d.networkMsgs <- aMsg: - return nil + case d.networkMsgs <- nMsg: case <-d.quit: - return errors.New("discovery has been shutted down") + nMsg.err <- errors.New("discovery has shut down") } + + return nMsg.err } // ProcessLocalAnnouncement sends a new remote announcement message along with @@ -160,20 +192,22 @@ func (d *Discovery) ProcessRemoteAnnouncement(msg lnwire.Message, // fully validated. The channels proofs will be included farther if nodes agreed // to announce this channel to the rest of the network. func (d *Discovery) ProcessLocalAnnouncement(msg lnwire.Message, - src *btcec.PublicKey) error { + src *btcec.PublicKey) chan error { - aMsg := &networkMsg{ + nMsg := &networkMsg{ msg: msg, isRemote: false, peer: src, + err: make(chan error, 1), } select { - case d.networkMsgs <- aMsg: - return nil + case d.networkMsgs <- nMsg: case <-d.quit: - return errors.New("discovery has been shutted down") + nMsg.err <- errors.New("discovery has shut down") } + + return nMsg.err } // SynchronizeNode sends a message to the service indicating it should @@ -246,7 +280,7 @@ func (d *Discovery) networkHandler() { defer retransmitTimer.Stop() // TODO(roasbeef): parametrize the above - trickleTimer := time.NewTicker(time.Millisecond * 300) + trickleTimer := time.NewTicker(d.cfg.TrickleDelay) defer trickleTimer.Stop() for { @@ -254,18 +288,18 @@ func (d *Discovery) networkHandler() { case announcement := <-d.networkMsgs: // Process the network announcement to determine if // this is either a new announcement from our PoV or an - // updates to a prior vertex/edge we previously - // accepted. - accepted := d.processNetworkAnnouncement(announcement) + // edges to a prior vertex/edge we previously + // proceeded. + emittedAnnouncements := d.processNetworkAnnouncement(announcement) - // If the updates was accepted, then add it to our next - // announcement batch to be broadcast once the trickle - // timer ticks gain. - if accepted { + // If the announcement was accepted, then add the + // emitted announcements to our announce batch to be + // broadcast once the trickle timer ticks gain. + if emittedAnnouncements != nil { // TODO(roasbeef): exclude peer that sent announcementBatch = append( announcementBatch, - announcement.msg, + emittedAnnouncements..., ) } @@ -294,12 +328,11 @@ func (d *Discovery) networkHandler() { } for _, ann := range prematureAnns { - accepted := d.processNetworkAnnouncement(ann) - - if accepted { + emittedAnnouncements := d.processNetworkAnnouncement(ann) + if emittedAnnouncements != nil { announcementBatch = append( announcementBatch, - ann.msg, + emittedAnnouncements..., ) } } @@ -309,7 +342,7 @@ func (d *Discovery) networkHandler() { // flush to the network the pending batch of new announcements // we've received since the last trickle tick. case <-trickleTimer.C: - // If the current announcement batch is nil, then we + // If the current announcements batch is nil, then we // have no further work here. if len(announcementBatch) == 0 { continue @@ -322,7 +355,8 @@ func (d *Discovery) networkHandler() { // them to all our immediately connected peers. err := d.cfg.Broadcast(nil, announcementBatch...) if err != nil { - log.Errorf("unable to send batch announcement: %v", err) + log.Errorf("unable to send batch "+ + "announcements: %v", err) continue } @@ -343,7 +377,7 @@ func (d *Discovery) networkHandler() { err := d.cfg.Router.ForAllOutgoingChannels( func(p *channeldb.ChannelEdgePolicy) error { c := &lnwire.ChannelUpdateAnnouncement{ - Signature: d.fakeSig, + Signature: p.Signature, ShortChannelID: lnwire.NewShortChanIDFromInt(p.ChannelID), Timestamp: uint32(p.LastUpdate.Unix()), Flags: p.Flags, @@ -381,8 +415,6 @@ func (d *Discovery) networkHandler() { // through the (subjectively) new information on their own. case syncReq := <-d.syncRequests: nodePub := syncReq.node.SerializeCompressed() - log.Infof("Synchronizing channel graph with %x", nodePub) - if err := d.synchronize(syncReq); err != nil { log.Errorf("unable to sync graph state with %x: %v", nodePub, err) @@ -397,23 +429,22 @@ func (d *Discovery) networkHandler() { } // processNetworkAnnouncement processes a new network relate authenticated -// channel or node announcement. If the updates didn't affect the internal state -// of the draft due to either being out of date, invalid, or redundant, then -// false is returned. Otherwise, true is returned indicating that the caller -// may want to batch this request to be broadcast to immediate peers during the -// next announcement epoch. -func (d *Discovery) processNetworkAnnouncement(aMsg *networkMsg) bool { - isPremature := func(chanID *lnwire.ShortChannelID) bool { - return chanID.BlockHeight > d.bestHeight +// channel or node announcement or announcements proofs. If the announcement +// didn't affect the internal state due to either being out of date, invalid, +// or redundant, then nil is returned. Otherwise, the set of announcements +// will be returned which should be broadcasted to the rest of the network. +func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Message { + var announcements []lnwire.Message + isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { + return chanID.BlockHeight+delta > d.bestHeight } - switch msg := aMsg.msg.(type) { - + switch msg := nMsg.msg.(type) { // A new node announcement has arrived which either presents a new // node, or a node updating previously advertised information. case *lnwire.NodeAnnouncement: - if aMsg.isRemote { - // TODO(andrew.shvv) Add node validation + if nMsg.isRemote { + // TODO(andrew.shvv) add validation } node := &channeldb.LightningNode{ @@ -426,10 +457,25 @@ func (d *Discovery) processNetworkAnnouncement(aMsg *networkMsg) bool { } if err := d.cfg.Router.AddNode(node); err != nil { - log.Errorf("unable to add node: %v", err) - return false + e := errors.Errorf("unable to add node: %v", err) + if routing.IsError(err, + routing.ErrOutdated, + routing.ErrIgnored) { + log.Info(e) + } else { + log.Error(e) + } + nMsg.err <- e + return nil } + // Node announcement was successfully proceeded and know it + // might be broadcasted to other connected nodes. + announcements = append(announcements, msg) + + nMsg.err <- nil + return announcements + // A new channel announcement has arrived, this indicates the // *creation* of a new channel within the network. This only advertises // the existence of a channel and not yet the routing policies in @@ -438,7 +484,7 @@ func (d *Discovery) processNetworkAnnouncement(aMsg *networkMsg) bool { // If the advertised inclusionary block is beyond our knowledge // of the chain tip, then we'll put the announcement in limbo // to be fully verified once we advance forward in the chain. - if isPremature(&msg.ShortChannelID) { + if isPremature(msg.ShortChannelID, 0) { blockHeight := msg.ShortChannelID.BlockHeight log.Infof("Announcement for chan_id=(%v), is "+ "premature: advertises height %v, only height "+ @@ -447,21 +493,21 @@ func (d *Discovery) processNetworkAnnouncement(aMsg *networkMsg) bool { d.prematureAnnouncements[blockHeight] = append( d.prematureAnnouncements[blockHeight], - aMsg, + nMsg, ) - return false + return nil } var proof *channeldb.ChannelAuthProof - if aMsg.isRemote { - // TODO(andrew.shvv) Add channel validation - } + if nMsg.isRemote { + // TODO(andrew.shvv) Add validation - proof = &channeldb.ChannelAuthProof{ - NodeSig1: msg.NodeSig1, - NodeSig2: msg.NodeSig2, - BitcoinSig1: msg.BitcoinSig1, - BitcoinSig2: msg.BitcoinSig2, + proof = &channeldb.ChannelAuthProof{ + NodeSig1: msg.NodeSig1, + NodeSig2: msg.NodeSig2, + BitcoinSig1: msg.BitcoinSig1, + BitcoinSig2: msg.BitcoinSig2, + } } edge := &channeldb.ChannelEdgeInfo{ @@ -474,45 +520,69 @@ func (d *Discovery) processNetworkAnnouncement(aMsg *networkMsg) bool { } if err := d.cfg.Router.AddEdge(edge); err != nil { - if !routing.IsError(err, routing.ErrOutdated) { - log.Errorf("unable to add edge: %v", err) + e := errors.Errorf("unable to add edge: %v", err) + if routing.IsError(err, + routing.ErrOutdated, + routing.ErrIgnored) { + log.Info(e) } else { - log.Info("Unable to add edge: %v", err) + log.Error(e) } - - return false + nMsg.err <- e + return nil } - // A new authenticated channel updates has arrived, this indicates + // Channel announcement was successfully proceeded and know it + // might be broadcasted to other connected nodes if it was + // announcement with proof (remote). + if proof != nil { + announcements = append(announcements, msg) + } + + nMsg.err <- nil + return announcements + + // A new authenticated channel edges has arrived, this indicates // that the directional information for an already known channel has // been updated. case *lnwire.ChannelUpdateAnnouncement: - chanID := msg.ShortChannelID.ToUint64() + blockHeight := msg.ShortChannelID.BlockHeight + shortChanID := msg.ShortChannelID.ToUint64() // If the advertised inclusionary block is beyond our knowledge // of the chain tip, then we'll put the announcement in limbo // to be fully verified once we advance forward in the chain. - if isPremature(&msg.ShortChannelID) { - blockHeight := msg.ShortChannelID.BlockHeight - log.Infof("Update announcement for chan_id=(%v), is "+ - "premature: advertises height %v, only height "+ - "%v is known", chanID, blockHeight, - d.bestHeight) + if isPremature(msg.ShortChannelID, 0) { + log.Infof("Update announcement for "+ + "shortChanID=(%v), is premature: advertises "+ + "height %v, only height %v is known", + shortChanID, blockHeight, d.bestHeight) d.prematureAnnouncements[blockHeight] = append( d.prematureAnnouncements[blockHeight], - aMsg, + nMsg, ) - return false + return nil } - if aMsg.isRemote { - // TODO(andrew.shvv) Add update channel validation + // Get the node pub key as far as we don't have it in + // channel update announcement message and verify + // message signature. + chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) + if err != nil { + err := errors.Errorf("unable to validate "+ + "channel update shortChanID=%v: %v", + shortChanID, err) + nMsg.err <- err + return nil } + // TODO(andrew.shvv) Add validation + // TODO(roasbeef): should be msat here update := &channeldb.ChannelEdgePolicy{ - ChannelID: chanID, + Signature: msg.Signature, + ChannelID: shortChanID, LastUpdate: time.Unix(int64(msg.Timestamp), 0), Flags: msg.Flags, TimeLockDelta: msg.TimeLockDelta, @@ -522,12 +592,200 @@ func (d *Discovery) processNetworkAnnouncement(aMsg *networkMsg) bool { } if err := d.cfg.Router.UpdateEdge(update); err != nil { - log.Errorf("unable to update edge: %v", err) - return false - } - } + e := errors.Errorf("unable to update edge: %v", err) + if routing.IsError(err, + routing.ErrOutdated, + routing.ErrIgnored) { + log.Info(e) + } else { + log.Error(e) + } - return true + nMsg.err <- e + return nil + } + + // Channel update announcement was successfully proceeded and + // know it might be broadcasted to other connected nodes. + // We should announce the edge to rest of the network only + // if channel has the authentication proof. + if chanInfo.AuthProof != nil { + announcements = append(announcements, msg) + } + + nMsg.err <- nil + return announcements + + // New signature announcement received which indicates willingness + // of the parties (to exchange the channel signatures / announce newly + // created channel). + case *lnwire.AnnounceSignatures: + needBlockHeight := msg.ShortChannelID.BlockHeight + d.cfg.ProofMatureDelta + shortChanID := msg.ShortChannelID.ToUint64() + prefix := "local" + if nMsg.isRemote { + prefix = "remote" + } + + // By the specification proof should be sent after some number of + // confirmations after channel was registered in bitcoin + // blockchain. So we should check that proof is premature and + // if not send it to the be proceeded again. This allows us to + // be tolerant to other clients if this constraint was changed. + if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) { + d.prematureAnnouncements[needBlockHeight] = append( + d.prematureAnnouncements[needBlockHeight], + nMsg, + ) + log.Infof("Premature proof annoucement, "+ + "current block height lower than needed: %v <"+ + " %v, add announcement to reprocessing batch", + d.bestHeight, needBlockHeight) + return nil + } + + // Check that we have channel with such channel id in out + // lightning network topology. + chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) + if err != nil { + err := errors.Errorf("unable to process channel "+ + "%v proof with shortChanID=%v: %v", prefix, + shortChanID, err) + nMsg.err <- err + return nil + } + + isFirstNode := bytes.Equal(nMsg.peer.SerializeCompressed(), + chanInfo.NodeKey1.SerializeCompressed()) + isSecondNode := bytes.Equal(nMsg.peer.SerializeCompressed(), + chanInfo.NodeKey2.SerializeCompressed()) + + // Check that channel that was retrieved belongs to the peer + // which sent the proof announcement, otherwise the proof for + // might be rewritten by the any lightning network node. + if !(isFirstNode || isSecondNode) { + err := errors.Errorf("channel that was received not "+ + "belongs to the peer which sent the proof, "+ + "shortChanID=%v", shortChanID) + log.Error(err) + nMsg.err <- err + return nil + } + + // Check that we received the opposite proof, if so, than we + // should construct the full proof, and create the channel + // announcement. If we didn't receive the opposite half of the + // proof than we should store it this one, and wait for opposite + // to be received. + oppositeKey := newProofKey(chanInfo.ChannelID, !nMsg.isRemote) + oppositeProof, ok := d.waitingProofs[oppositeKey] + if !ok { + key := newProofKey(chanInfo.ChannelID, nMsg.isRemote) + d.waitingProofs[key] = msg + + // If proof was send from funding manager than we + // should send the announce signature message to + // remote side. + if !nMsg.isRemote { + // Check that first node of the channel info + // corresponds to us. + var remotePeer *btcec.PublicKey + if isFirstNode { + remotePeer = chanInfo.NodeKey2 + } else { + remotePeer = chanInfo.NodeKey1 + } + + err := d.cfg.SendToPeer(remotePeer, msg) + if err != nil { + log.Errorf("unable to send "+ + "announcement message to "+ + "peer: %x", + remotePeer.SerializeCompressed()) + } + + log.Infof("Send channel announcement proof "+ + "for shortChanID=%v to remote peer: "+ + "%x", shortChanID, remotePeer.SerializeCompressed()) + } + + log.Infof("Incoming %v proof announcement for "+ + "shortChanID=%v have been proceeded and waiting for opposite proof", + prefix, shortChanID) + + nMsg.err <- nil + return nil + } + + var dbProof channeldb.ChannelAuthProof + if isFirstNode { + dbProof.NodeSig1 = msg.NodeSignature + dbProof.NodeSig2 = oppositeProof.NodeSignature + dbProof.BitcoinSig1 = msg.BitcoinSignature + dbProof.BitcoinSig2 = oppositeProof.BitcoinSignature + } else { + dbProof.NodeSig1 = oppositeProof.NodeSignature + dbProof.NodeSig2 = msg.NodeSignature + dbProof.BitcoinSig1 = oppositeProof.BitcoinSignature + dbProof.BitcoinSig2 = msg.BitcoinSignature + } + + chanAnn, e1Ann, e2Ann := createChanAnnouncement(&dbProof, chanInfo, e1, e2) + + // TODO(andrew.shvv) Add validation + + // If the channel was returned by the router it means that + // existence of funding point and inclusion of nodes bitcoin + // keys in it already checked by the router. On this stage we + // should check that node keys are corresponds to the bitcoin + // keys by validating the signatures of announcement. + // If proof is valid than we should populate the channel + // edge with it, so we can announce it on peer connect. + err = d.cfg.Router.AddProof(msg.ShortChannelID, &dbProof) + if err != nil { + err := errors.Errorf("unable add proof to the "+ + "channel chanID=%v: %v", msg.ChannelID, err) + log.Error(err) + nMsg.err <- err + return nil + } + + // Proof was successfully created and now can announce the + // channel to the remain network. + log.Infof("Incoming %v proof announcement for shortChanID=%v"+ + " have been proceeded, adding channel announcement in"+ + " the broadcasting batch", prefix, shortChanID) + + announcements = append(announcements, chanAnn) + if e1Ann != nil { + announcements = append(announcements, e1Ann) + } + if e2Ann != nil { + announcements = append(announcements, e2Ann) + } + + if !nMsg.isRemote { + var remotePeer *btcec.PublicKey + if isFirstNode { + remotePeer = chanInfo.NodeKey2 + } else { + remotePeer = chanInfo.NodeKey1 + } + err = d.cfg.SendToPeer(remotePeer, msg) + if err != nil { + log.Errorf("unable to send announcement "+ + "message to peer: %x", + remotePeer.SerializeCompressed()) + } + } + + nMsg.err <- nil + return announcements + + default: + nMsg.err <- errors.New("wrong type of the announcement") + return nil + } } // synchronize attempts to synchronize the target node in the syncReq to @@ -555,7 +813,7 @@ func (d *Discovery) synchronize(syncReq *syncRequest) error { } ann := &lnwire.NodeAnnouncement{ - Signature: d.fakeSig, + Signature: node.AuthSig, Timestamp: uint32(node.LastUpdate.Unix()), Addresses: node.Addresses, NodeID: node.PubKey, @@ -573,71 +831,39 @@ func (d *Discovery) synchronize(syncReq *syncRequest) error { // With the vertexes gathered, we'll no retrieve the initial // announcement, as well as the latest channel update announcement for - // both of the directed edges that make up the channel. + // both of the directed infos that make up the channel. var numEdges uint32 if err := d.cfg.Router.ForEachChannel(func(chanInfo *channeldb.ChannelEdgeInfo, e1, e2 *channeldb.ChannelEdgePolicy) error { - - chanID := lnwire.NewShortChanIDFromInt(chanInfo.ChannelID) - // First, using the parameters of the channel, along with the // channel authentication proof, we'll create re-create the // original authenticated channel announcement. - // TODO(andrew.shvv) skip if proof is nil - authProof := chanInfo.AuthProof - chanAnn := &lnwire.ChannelAnnouncement{ - NodeSig1: authProof.NodeSig1, - NodeSig2: authProof.NodeSig2, - ShortChannelID: chanID, - BitcoinSig1: authProof.BitcoinSig1, - BitcoinSig2: authProof.BitcoinSig2, - NodeID1: chanInfo.NodeKey1, - NodeID2: chanInfo.NodeKey2, - BitcoinKey1: chanInfo.BitcoinKey1, - BitcoinKey2: chanInfo.BitcoinKey2, - } - announceMessages = append(announceMessages, chanAnn) + if chanInfo.AuthProof != nil { + chanAnn, e1Ann, e2Ann := createChanAnnouncement( + chanInfo.AuthProof, chanInfo, e1, e2) - // Since it's up to a node's policy as to whether they - // advertise the edge in dire direction, we don't create an - // advertisement if the edge is nil. - if e1 != nil { - announceMessages = append(announceMessages, &lnwire.ChannelUpdateAnnouncement{ - Signature: d.fakeSig, - ShortChannelID: chanID, - Timestamp: uint32(e1.LastUpdate.Unix()), - Flags: 0, - TimeLockDelta: e1.TimeLockDelta, - HtlcMinimumMsat: uint32(e1.MinHTLC), - FeeBaseMsat: uint32(e1.FeeBaseMSat), - FeeProportionalMillionths: uint32(e1.FeeProportionalMillionths), - }) - } - if e2 != nil { - announceMessages = append(announceMessages, &lnwire.ChannelUpdateAnnouncement{ - Signature: d.fakeSig, - ShortChannelID: chanID, - Timestamp: uint32(e2.LastUpdate.Unix()), - Flags: 1, - TimeLockDelta: e2.TimeLockDelta, - HtlcMinimumMsat: uint32(e2.MinHTLC), - FeeBaseMsat: uint32(e2.FeeBaseMSat), - FeeProportionalMillionths: uint32(e2.FeeProportionalMillionths), - }) + announceMessages = append(announceMessages, chanAnn) + if e1Ann != nil { + announceMessages = append(announceMessages, e1Ann) + } + if e2Ann != nil { + announceMessages = append(announceMessages, e2Ann) + } + + numEdges++ } - numEdges++ return nil }); err != nil && err != channeldb.ErrGraphNoEdgesFound { - log.Errorf("unable to sync edges w/ peer: %v", err) + log.Errorf("unable to sync infos with peer: %v", err) return err } log.Infof("Syncing channel graph state with %x, sending %v "+ - "nodes and %v edges", targetNode.SerializeCompressed(), + "nodes and %v infos", targetNode.SerializeCompressed(), numNodes, numEdges) // With all the announcement messages gathered, send them all in a // single batch to the target peer. - return d.cfg.SendMessages(targetNode, announceMessages...) + return d.cfg.SendToPeer(targetNode, announceMessages...) } diff --git a/discovery/service_test.go b/discovery/service_test.go index a0b0885d..1de27f91 100644 --- a/discovery/service_test.go +++ b/discovery/service_test.go @@ -13,6 +13,7 @@ import ( "time" + "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" @@ -33,18 +34,39 @@ var ( } _, _ = testSig.R.SetString("63724406601629180062774974542967536251589935445068131219452686511677818569431", 10) _, _ = testSig.S.SetString("18801056069249825825291287104931333862866033135609736119018462340006816851118", 10) + + inputStr = "147caa76786596590baa4e98f5d9f48b86c7765e489f7a6ff3360fe5c674360b" + sha, _ = chainhash.NewHashFromStr(inputStr) + outpoint = wire.NewOutPoint(sha, 0) + + bitcoinKeyPriv1, _ = btcec.NewPrivateKey(btcec.S256()) + bitcoinKeyPub1 = bitcoinKeyPriv1.PubKey() + + nodeKeyPriv1, _ = btcec.NewPrivateKey(btcec.S256()) + nodeKeyPub1 = nodeKeyPriv1.PubKey() + + bitcoinKeyPriv2, _ = btcec.NewPrivateKey(btcec.S256()) + bitcoinKeyPub2 = bitcoinKeyPriv2.PubKey() + + nodeKeyPriv2, _ = btcec.NewPrivateKey(btcec.S256()) + nodeKeyPub2 = nodeKeyPriv2.PubKey() + + trickleDelay = time.Millisecond * 300 + proofMatureDelta uint32 ) type mockGraphSource struct { nodes []*channeldb.LightningNode - edges []*channeldb.ChannelEdgeInfo - updates []*channeldb.ChannelEdgePolicy + infos map[uint64]*channeldb.ChannelEdgeInfo + edges map[uint64][]*channeldb.ChannelEdgePolicy bestHeight uint32 } func newMockRouter(height uint32) *mockGraphSource { return &mockGraphSource{ bestHeight: height, + infos: make(map[uint64]*channeldb.ChannelEdgeInfo), + edges: make(map[uint64][]*channeldb.ChannelEdgePolicy), } } @@ -55,13 +77,19 @@ func (r *mockGraphSource) AddNode(node *channeldb.LightningNode) error { return nil } -func (r *mockGraphSource) AddEdge(edge *channeldb.ChannelEdgeInfo) error { - r.edges = append(r.edges, edge) +func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo) error { + if _, ok := r.infos[info.ChannelID]; ok { + return errors.New("info already exist") + } + r.infos[info.ChannelID] = info return nil } -func (r *mockGraphSource) UpdateEdge(policy *channeldb.ChannelEdgePolicy) error { - r.updates = append(r.updates, policy) +func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy) error { + r.edges[edge.ChannelID] = append( + r.edges[edge.ChannelID], + edge, + ) return nil } @@ -91,6 +119,28 @@ func (r *mockGraphSource) ForEachChannel(func(chanInfo *channeldb.ChannelEdgeInf return nil } +func (r *mockGraphSource) GetChannelByID(chanID lnwire.ShortChannelID) ( + *channeldb.ChannelEdgeInfo, + *channeldb.ChannelEdgePolicy, + *channeldb.ChannelEdgePolicy, error) { + + chanInfo, ok := r.infos[chanID.ToUint64()] + if !ok { + return nil, nil, nil, errors.New("can't find channel info") + } + + edges := r.edges[chanID.ToUint64()] + if len(edges) == 0 { + return chanInfo, nil, nil, nil + } + + if len(edges) == 1 { + return chanInfo, edges[0], nil, nil + } + + return chanInfo, edges[0], edges[1], nil +} + type mockNotifier struct { clientCounter uint32 epochClients map[uint32]chan *chainntnfs.BlockEpoch @@ -149,31 +199,87 @@ func (m *mockNotifier) Stop() error { return nil } -func createNodeAnnouncement() (*lnwire.NodeAnnouncement, +type annBatch struct { + nodeAnn1 *lnwire.NodeAnnouncement + nodeAnn2 *lnwire.NodeAnnouncement + localChanAnn *lnwire.ChannelAnnouncement + remoteChanAnn *lnwire.ChannelAnnouncement + chanUpdAnn *lnwire.ChannelUpdateAnnouncement + localProofAnn *lnwire.AnnounceSignatures + remoteProofAnn *lnwire.AnnounceSignatures +} + +func createAnnouncements(blockHeight uint32) (*annBatch, error) { + var err error + var batch annBatch + + batch.nodeAnn1, err = createNodeAnnouncement(nodeKeyPriv1) + if err != nil { + return nil, err + } + + batch.nodeAnn2, err = createNodeAnnouncement(nodeKeyPriv2) + if err != nil { + return nil, err + } + + batch.remoteChanAnn, err = createRemoteChannelAnnouncement(blockHeight) + if err != nil { + return nil, err + } + + batch.localProofAnn = &lnwire.AnnounceSignatures{ + NodeSignature: batch.remoteChanAnn.NodeSig1, + BitcoinSignature: batch.remoteChanAnn.BitcoinSig1, + } + + batch.remoteProofAnn = &lnwire.AnnounceSignatures{ + NodeSignature: batch.remoteChanAnn.NodeSig2, + BitcoinSignature: batch.remoteChanAnn.BitcoinSig2, + } + + batch.localChanAnn, err = createRemoteChannelAnnouncement(blockHeight) + if err != nil { + return nil, err + } + batch.localChanAnn.BitcoinSig1 = nil + batch.localChanAnn.BitcoinSig2 = nil + batch.localChanAnn.NodeSig1 = nil + batch.localChanAnn.NodeSig2 = nil + + batch.chanUpdAnn, err = createUpdateAnnouncement(blockHeight) + if err != nil { + return nil, err + } + + return &batch, nil + +} + +func createNodeAnnouncement(priv *btcec.PrivateKey) (*lnwire.NodeAnnouncement, error) { - priv, err := btcec.NewPrivateKey(btcec.S256()) + + alias, err := lnwire.NewAlias("kek" + string(priv.Serialize())) if err != nil { return nil, err } - pub := priv.PubKey().SerializeCompressed() - - alias, err := lnwire.NewAlias("kek" + string(pub[:])) - if err != nil { - return nil, err - } - - return &lnwire.NodeAnnouncement{ + a := &lnwire.NodeAnnouncement{ + Signature: testSig, Timestamp: uint32(prand.Int31()), Addresses: testAddrs, NodeID: priv.PubKey(), Alias: alias, Features: testFeatures, - }, nil + } + + return a, nil } -func createUpdateAnnouncement(blockHeight uint32) *lnwire.ChannelUpdateAnnouncement { - return &lnwire.ChannelUpdateAnnouncement{ +func createUpdateAnnouncement(blockHeight uint32) (*lnwire.ChannelUpdateAnnouncement, + error) { + + a := &lnwire.ChannelUpdateAnnouncement{ Signature: testSig, ShortChannelID: lnwire.ShortChannelID{ BlockHeight: blockHeight, @@ -184,26 +290,37 @@ func createUpdateAnnouncement(blockHeight uint32) *lnwire.ChannelUpdateAnnouncem FeeBaseMsat: uint32(prand.Int31()), FeeProportionalMillionths: uint32(prand.Int31()), } + + return a, nil } -func createChannelAnnouncement(blockHeight uint32) *lnwire.ChannelAnnouncement { - // Our fake channel will be "confirmed" at height 101. - chanID := lnwire.ShortChannelID{ - BlockHeight: blockHeight, - TxIndex: 0, - TxPosition: 0, +func createRemoteChannelAnnouncement(blockHeight uint32) (*lnwire.ChannelAnnouncement, + error) { + + a := &lnwire.ChannelAnnouncement{ + ShortChannelID: lnwire.ShortChannelID{ + BlockHeight: blockHeight, + TxIndex: 0, + TxPosition: 0, + }, + NodeID1: nodeKeyPub1, + NodeID2: nodeKeyPub2, + BitcoinKey1: bitcoinKeyPub1, + BitcoinKey2: bitcoinKeyPub2, + + NodeSig1: testSig, + NodeSig2: testSig, + BitcoinSig1: testSig, + BitcoinSig2: testSig, } - return &lnwire.ChannelAnnouncement{ - ShortChannelID: chanID, - } + return a, nil } type testCtx struct { - discovery *Discovery - router *mockGraphSource - notifier *mockNotifier - + discovery *Discovery + router *mockGraphSource + notifier *mockNotifier broadcastedMessage chan lnwire.Message } @@ -215,7 +332,7 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { notifier := newMockNotifier() router := newMockRouter(startHeight) - broadcastedMessage := make(chan lnwire.Message) + broadcastedMessage := make(chan lnwire.Message, 10) discovery, err := New(Config{ Notifier: notifier, Broadcast: func(_ *btcec.PublicKey, msgs ...lnwire.Message) error { @@ -224,8 +341,12 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { } return nil }, - SendMessages: nil, - Router: router, + SendToPeer: func(target *btcec.PublicKey, msg ...lnwire.Message) error { + return nil + }, + Router: router, + TrickleDelay: trickleDelay, + ProofMatureDelta: proofMatureDelta, }) if err != nil { return nil, nil, fmt.Errorf("unable to create router %v", err) @@ -255,21 +376,23 @@ func TestProcessAnnouncement(t *testing.T) { } defer cleanup() - priv, err := btcec.NewPrivateKey(btcec.S256()) - if err != nil { - t.Fatalf("can't create node pub key: %v", err) - } - nodePub := priv.PubKey() - - na, err := createNodeAnnouncement() + // Create node valid, signed announcement, process it with with + // discovery service, check that valid announcement have been + // propagated farther into the lightning network, and check that we + // added new node into router. + na, err := createNodeAnnouncement(nodeKeyPriv1) if err != nil { t.Fatalf("can't create node announcement: %v", err) } - ctx.discovery.ProcessRemoteAnnouncement(na, nodePub) + + err = <-ctx.discovery.ProcessRemoteAnnouncement(na, na.NodeID) + if err != nil { + t.Fatalf("can't process remote announcement: %v", err) + } select { case <-ctx.broadcastedMessage: - case <-time.After(time.Second): + case <-time.After(2 * trickleDelay): t.Fatal("announcememt wasn't proceeded") } @@ -277,32 +400,54 @@ func TestProcessAnnouncement(t *testing.T) { t.Fatalf("node wasn't added to router: %v", err) } - ca := createChannelAnnouncement(0) - ctx.discovery.ProcessRemoteAnnouncement(ca, nodePub) + // Pretending that we receive the valid channel announcement from + // remote side, and check that we broadcasted it to the our network, + // and added channel info in the router. + ca, err := createRemoteChannelAnnouncement(0) + if err != nil { + t.Fatalf("can't create channel announcement: %v", err) + } + + err = <-ctx.discovery.ProcessRemoteAnnouncement(ca, na.NodeID) + if err != nil { + t.Fatalf("can't process remote announcement: %v", err) + } + select { case <-ctx.broadcastedMessage: - case <-time.After(time.Second): + case <-time.After(2 * trickleDelay): + t.Fatal("announcememt wasn't proceeded") + } + + if len(ctx.router.infos) != 1 { + t.Fatalf("edge wasn't added to router: %v", err) + } + + // Pretending that we received valid channel policy update from remote + // side, and check that we broadcasted it to the other network, and + // added updates to the router. + ua, err := createUpdateAnnouncement(0) + if err != nil { + t.Fatalf("can't create update announcement: %v", err) + } + + err = <-ctx.discovery.ProcessRemoteAnnouncement(ua, na.NodeID) + if err != nil { + t.Fatalf("can't process remote announcement: %v", err) + } + + select { + case <-ctx.broadcastedMessage: + case <-time.After(2 * trickleDelay): t.Fatal("announcememt wasn't proceeded") } if len(ctx.router.edges) != 1 { - t.Fatalf("edge wasn't added to router: %v", err) - } - - ua := createUpdateAnnouncement(0) - ctx.discovery.ProcessRemoteAnnouncement(ua, nodePub) - select { - case <-ctx.broadcastedMessage: - case <-time.After(time.Second): - t.Fatal("announcememt wasn't proceeded") - } - - if len(ctx.router.updates) != 1 { t.Fatalf("edge update wasn't added to router: %v", err) } } -// TestPrematureAnnouncement checks that premature networkMsgs are +// TestPrematureAnnouncement checks that premature announcements are // not propagated to the router subsystem until block with according // block height received. func TestPrematureAnnouncement(t *testing.T) { @@ -312,62 +457,150 @@ func TestPrematureAnnouncement(t *testing.T) { } defer cleanup() - priv, err := btcec.NewPrivateKey(btcec.S256()) + na, err := createNodeAnnouncement(nodeKeyPriv1) if err != nil { - t.Fatalf("can't create node pub key: %v", err) + t.Fatalf("can't create node announcement: %v", err) + } + + // Pretending that we receive the valid channel announcement from + // remote side, but block height of this announcement is greater than + // highest know to us, for that reason it should be added to the + // repeat/premature batch. + ca, err := createRemoteChannelAnnouncement(1) + if err != nil { + t.Fatalf("can't create channel announcement: %v", err) } - nodePub := priv.PubKey() - ca := createChannelAnnouncement(1) - ctx.discovery.ProcessRemoteAnnouncement(ca, nodePub) select { - case <-ctx.broadcastedMessage: + case <-ctx.discovery.ProcessRemoteAnnouncement(ca, na.NodeID): + t.Fatal("announcement was proceeded") + case <-time.After(100 * time.Millisecond): + } + + if len(ctx.router.infos) != 0 { + t.Fatal("edge was added to router") + } + + // Pretending that we receive the valid channel update announcement from + // remote side, but block height of this announcement is greater than + // highest know to us, for that reason it should be added to the + // repeat/premature batch. + ua, err := createUpdateAnnouncement(1) + if err != nil { + t.Fatalf("can't create update announcement: %v", err) + } + + select { + case <-ctx.discovery.ProcessRemoteAnnouncement(ua, na.NodeID): t.Fatal("announcement was proceeded") case <-time.After(100 * time.Millisecond): } if len(ctx.router.edges) != 0 { - t.Fatal("edge was added to router") - } - - ua := createUpdateAnnouncement(1) - ctx.discovery.ProcessRemoteAnnouncement(ua, nodePub) - select { - case <-ctx.broadcastedMessage: - t.Fatal("announcement was proceeded") - case <-time.After(100 * time.Millisecond): - } - - if len(ctx.router.updates) != 0 { t.Fatal("edge update was added to router") } + // Generate new block and waiting the previously added announcements + // to be proceeded. newBlock := &wire.MsgBlock{} ctx.notifier.notifyBlock(newBlock.Header.BlockHash(), 1) select { case <-ctx.broadcastedMessage: - if err != nil { - t.Fatalf("announcememt was proceeded with err: %v", err) - } - case <-time.After(time.Second): - t.Fatal("announcememt wasn't proceeded") + case <-time.After(2 * trickleDelay): + t.Fatal("announcememt wasn't broadcasted") } - if len(ctx.router.edges) != 1 { + if len(ctx.router.infos) != 1 { t.Fatalf("edge was't added to router: %v", err) } select { case <-ctx.broadcastedMessage: - if err != nil { - t.Fatalf("announcememt was proceeded with err: %v", err) - } - case <-time.After(time.Second): - t.Fatal("announcememt wasn't proceeded") + case <-time.After(2 * trickleDelay): + t.Fatal("announcememt wasn't broadcasted") } - if len(ctx.router.updates) != 1 { + if len(ctx.router.edges) != 1 { t.Fatalf("edge update wasn't added to router: %v", err) } } + +// TestSignatureAnnouncement.... +func TestSignatureAnnouncement(t *testing.T) { + ctx, cleanup, err := createTestCtx(proofMatureDelta) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) + } + + localKey := batch.nodeAnn1.NodeID + remoteKey := batch.nodeAnn2.NodeID + + // Recreate lightning network topology. Initialize router with + // channel between two nodes. + err = <-ctx.discovery.ProcessLocalAnnouncement(batch.localChanAnn, localKey) + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcasted") + case <-time.After(2 * trickleDelay): + } + + err = <-ctx.discovery.ProcessLocalAnnouncement(batch.chanUpdAnn, localKey) + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcasted") + case <-time.After(2 * trickleDelay): + } + + err = <-ctx.discovery.ProcessRemoteAnnouncement(batch.chanUpdAnn, remoteKey) + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcasted") + case <-time.After(2 * trickleDelay): + } + + // Pretending that we receive local channel announcement from funding + // manager, thereby kick off the announcement exchange process. + err = <-ctx.discovery.ProcessLocalAnnouncement(batch.localProofAnn, localKey) + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + select { + case <-ctx.broadcastedMessage: + t.Fatal("announcements were broadcasted") + case <-time.After(2 * trickleDelay): + } + + if len(ctx.discovery.waitingProofs) != 1 { + t.Fatal("local proof annoucement should be stored") + } + + err = <-ctx.discovery.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey) + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + for i := 0; i < 3; i++ { + select { + case <-ctx.broadcastedMessage: + case <-time.After(time.Second): + t.Fatal("announcement wasn't broadcasted") + } + } +} diff --git a/discovery/utils.go b/discovery/utils.go new file mode 100644 index 00000000..53efc99b --- /dev/null +++ b/discovery/utils.go @@ -0,0 +1,93 @@ +package discovery + +import ( + "encoding/binary" + + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" +) + +// newProofKey constructs new announcement signature message key. +func newProofKey(chanID uint64, isRemote bool) waitingProofKey { + return waitingProofKey{ + chanID: chanID, + isRemote: isRemote, + } +} + +// ToBytes represents the key in the byte format. +func (k waitingProofKey) ToBytes() []byte { + var key [10]byte + + var b uint8 + if k.isRemote { + b = 0 + } else { + b = 1 + } + + binary.BigEndian.PutUint64(key[:], k.chanID) + key[9] = b + + return key[:] +} + +// createChanAnnouncement helper function which creates the channel announcement +// by the given channeldb objects. +func createChanAnnouncement(chanProof *channeldb.ChannelAuthProof, + chanInfo *channeldb.ChannelEdgeInfo, + e1, e2 *channeldb.ChannelEdgePolicy) ( + *lnwire.ChannelAnnouncement, + *lnwire.ChannelUpdateAnnouncement, + *lnwire.ChannelUpdateAnnouncement) { + // First, using the parameters of the channel, along with the + // channel authentication chanProof, we'll create re-create the + // original authenticated channel announcement. + chanID := lnwire.NewShortChanIDFromInt(chanInfo.ChannelID) + chanAnn := &lnwire.ChannelAnnouncement{ + NodeSig1: chanProof.NodeSig1, + NodeSig2: chanProof.NodeSig2, + ShortChannelID: chanID, + BitcoinSig1: chanProof.BitcoinSig1, + BitcoinSig2: chanProof.BitcoinSig2, + NodeID1: chanInfo.NodeKey1, + NodeID2: chanInfo.NodeKey2, + BitcoinKey1: chanInfo.BitcoinKey1, + BitcoinKey2: chanInfo.BitcoinKey2, + } + + // We'll unconditionally queue the channel's existence chanProof as + // it will need to be processed before either of the channel + // update networkMsgs. + + // Since it's up to a node's policy as to whether they + // advertise the edge in dire direction, we don't create an + // advertisement if the edge is nil. + var edge1Ann, edge2Ann *lnwire.ChannelUpdateAnnouncement + if e1 != nil { + edge1Ann = &lnwire.ChannelUpdateAnnouncement{ + Signature: e1.Signature, + ShortChannelID: chanID, + Timestamp: uint32(e1.LastUpdate.Unix()), + Flags: 0, + TimeLockDelta: e1.TimeLockDelta, + HtlcMinimumMsat: uint32(e1.MinHTLC), + FeeBaseMsat: uint32(e1.FeeBaseMSat), + FeeProportionalMillionths: uint32(e1.FeeProportionalMillionths), + } + } + if e2 != nil { + edge2Ann = &lnwire.ChannelUpdateAnnouncement{ + Signature: e2.Signature, + ShortChannelID: chanID, + Timestamp: uint32(e2.LastUpdate.Unix()), + Flags: 1, + TimeLockDelta: e2.TimeLockDelta, + HtlcMinimumMsat: uint32(e2.MinHTLC), + FeeBaseMsat: uint32(e2.FeeBaseMSat), + FeeProportionalMillionths: uint32(e2.FeeProportionalMillionths), + } + } + + return chanAnn, edge1Ann, edge2Ann +} diff --git a/fundingmanager.go b/fundingmanager.go index 81343dca..af431342 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -2,7 +2,6 @@ package main import ( "bytes" - "encoding/hex" "sync" "sync/atomic" "time" @@ -141,9 +140,18 @@ type fundingConfig struct { // so that the channel creation process can be completed. Notifier chainntnfs.ChainNotifier + // SignNodeKey is used to generate signature for node public key with + // funding keys. + SignNodeKey func(nodeKey, fundingKey *btcec.PublicKey) (*btcec.Signature, error) + + // SignAnnouncement is used to generate the signatures for channel + // update, and node announcements, and also to generate the proof for + // the channel announcement. + SignAnnouncement func(msg lnwire.Message) (*btcec.Signature, error) + // SendToDiscovery is used by the FundingManager to announce newly created // channels to the rest of the Lightning Network. - SendToDiscovery func(msg lnwire.Message) + SendToDiscovery func(msg lnwire.Message) error // SendToPeer allows the FundingManager to send messages to the peer // node during the multiple steps involved in the creation of the @@ -202,8 +210,6 @@ type fundingManager struct { barrierMtx sync.RWMutex newChanBarriers map[wire.OutPoint]chan struct{} - fakeProof *channelProof - quit chan struct{} wg sync.WaitGroup } @@ -211,22 +217,8 @@ type fundingManager struct { // newFundingManager creates and initializes a new instance of the // fundingManager. func newFundingManager(cfg fundingConfig) (*fundingManager, error) { - // TODO(roasbeef): remove once we actually sign the funding_locked - // stuffs - s := "30450221008ce2bc69281ce27da07e6683571319d18e949ddfa2965fb6caa" + - "1bf0314f882d70220299105481d63e0f4bc2a88121167221b6700d72a0e" + - "ad154c03be696a292d24ae" - fakeSigHex, _ := hex.DecodeString(s) - fakeSig, _ := btcec.ParseSignature(fakeSigHex, btcec.S256()) - return &fundingManager{ - cfg: &cfg, - - fakeProof: &channelProof{ - nodeSig: fakeSig, - bitcoinSig: fakeSig, - }, - + cfg: &cfg, activeReservations: make(map[serializedPubKey]pendingChannels), newChanBarriers: make(map[wire.OutPoint]chan struct{}), fundingMsgs: make(chan interface{}, msgBufferSize), @@ -998,8 +990,9 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { // Register the new link with the L3 routing manager so this // new channel can be utilized during path // finding. - go f.announceChannel(f.cfg.IDKey, fmsg.peerAddress.IdentityKey, channel, - fmsg.msg.ChannelID, f.fakeProof, f.fakeProof) + go f.announceChannel(f.cfg.IDKey, fmsg.peerAddress.IdentityKey, + channel.LocalFundingKey, channel.RemoteFundingKey, + fmsg.msg.ChannelID, fundingPoint) } // channelProof is one half of the proof necessary to create an authenticated @@ -1013,8 +1006,9 @@ type channelProof struct { // chanAnnouncement encapsulates the two authenticated announcements that we // send out to the network after a new channel has been created locally. type chanAnnouncement struct { - chanAnn *lnwire.ChannelAnnouncement - edgeUpdate *lnwire.ChannelUpdateAnnouncement + chanAnn *lnwire.ChannelAnnouncement + chanUpdateAnn *lnwire.ChannelUpdateAnnouncement + chanProof *lnwire.AnnounceSignatures } // newChanAnnouncement creates the authenticated channel announcement messages @@ -1024,11 +1018,12 @@ type chanAnnouncement struct { // identity pub keys of both parties to the channel, and the second segment is // authenticated only by us and contains our directional routing policy for the // channel. -func newChanAnnouncement(localIdentity, remotePub *btcec.PublicKey, - channel *lnwallet.LightningChannel, chanID lnwire.ShortChannelID, - localProof, remoteProof *channelProof) *chanAnnouncement { +func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey *btcec.PublicKey, + localFundingKey, remoteFundingKey *btcec.PublicKey, chanID lnwire.ShortChannelID, + chanPoint wire.OutPoint) (*chanAnnouncement, error) { + var err error - // The unconditional section of the announcement is the ChannelID + // The unconditional section of the announcement is the ShortChannelID // itself which compactly encodes the location of the funding output // within the blockchain. chanAnn := &lnwire.ChannelAnnouncement{ @@ -1045,30 +1040,22 @@ func newChanAnnouncement(localIdentity, remotePub *btcec.PublicKey, // nodes indicates which of the nodes is "first". If our serialized // identity key is lower than theirs then we're the "first" node and // second otherwise. - selfBytes := localIdentity.SerializeCompressed() - remoteBytes := remotePub.SerializeCompressed() + selfBytes := localPubKey.SerializeCompressed() + remoteBytes := remotePubKey.SerializeCompressed() if bytes.Compare(selfBytes, remoteBytes) == -1 { - chanAnn.NodeID1 = localIdentity - chanAnn.NodeID2 = remotePub - chanAnn.NodeSig1 = localProof.nodeSig - chanAnn.NodeSig2 = remoteProof.nodeSig - chanAnn.BitcoinSig1 = localProof.nodeSig - chanAnn.BitcoinSig2 = remoteProof.nodeSig - chanAnn.BitcoinKey1 = channel.LocalFundingKey - chanAnn.BitcoinKey2 = channel.RemoteFundingKey + chanAnn.NodeID1 = localPubKey + chanAnn.NodeID2 = remotePubKey + chanAnn.BitcoinKey1 = localFundingKey + chanAnn.BitcoinKey2 = remoteFundingKey // If we're the first node then update the chanFlags to // indicate the "direction" of the update. chanFlags = 0 } else { - chanAnn.NodeID1 = remotePub - chanAnn.NodeID2 = localIdentity - chanAnn.NodeSig1 = remoteProof.nodeSig - chanAnn.NodeSig2 = localProof.nodeSig - chanAnn.BitcoinSig1 = remoteProof.nodeSig - chanAnn.BitcoinSig2 = localProof.nodeSig - chanAnn.BitcoinKey1 = channel.RemoteFundingKey - chanAnn.BitcoinKey2 = channel.LocalFundingKey + chanAnn.NodeID1 = remotePubKey + chanAnn.NodeID2 = localPubKey + chanAnn.BitcoinKey1 = remoteFundingKey + chanAnn.BitcoinKey2 = localFundingKey // If we're the second node then update the chanFlags to // indicate the "direction" of the update. @@ -1077,7 +1064,6 @@ func newChanAnnouncement(localIdentity, remotePub *btcec.PublicKey, // TODO(roasbeef): add real sig, populate proper FeeSchema chanUpdateAnn := &lnwire.ChannelUpdateAnnouncement{ - Signature: localProof.nodeSig, ShortChannelID: chanID, Timestamp: uint32(time.Now().Unix()), Flags: chanFlags, @@ -1087,10 +1073,40 @@ func newChanAnnouncement(localIdentity, remotePub *btcec.PublicKey, FeeProportionalMillionths: 0, } - return &chanAnnouncement{ - chanAnn: chanAnn, - edgeUpdate: chanUpdateAnn, + chanUpdateAnn.Signature, err = f.cfg.SignAnnouncement(chanUpdateAnn) + if err != nil { + return nil, errors.Errorf("unable to generate channel "+ + "update announcement signature: %v", err) } + + // Channel proof should be announced in the separate message, so we + // already have the bitcoin signature but in order to construct + // the proof we also need node signature. Use message signer in order + // to sign the message with node private key. + nodeSig, err := f.cfg.SignAnnouncement(chanAnn) + if err != nil { + return nil, errors.Errorf("unable to generate node "+ + "signature for channel announcement: %v", err) + } + + bitcoinSig, err := f.cfg.SignNodeKey(localPubKey, localFundingKey) + if err != nil { + return nil, errors.Errorf("unable to generate bitcoin "+ + "signature for node public key: %v", err) + } + + proof := &lnwire.AnnounceSignatures{ + ChannelID: chanPoint, + ShortChannelID: chanID, + NodeSignature: nodeSig, + BitcoinSignature: bitcoinSig, + } + + return &chanAnnouncement{ + chanAnn: chanAnn, + chanUpdateAnn: chanUpdateAnn, + chanProof: proof, + }, nil } // announceChannel announces a newly created channel to the rest of the network @@ -1098,17 +1114,23 @@ func newChanAnnouncement(localIdentity, remotePub *btcec.PublicKey, // network to recognize the legitimacy of the channel. The crafted // announcements are then send to the channel router to handle broadcasting to // the network during its next trickle. -func (f *fundingManager) announceChannel(idKey, remoteIDKey *btcec.PublicKey, - channel *lnwallet.LightningChannel, chanID lnwire.ShortChannelID, localProof, - remoteProof *channelProof) { +func (f *fundingManager) announceChannel(localIDKey, remoteIDKey *btcec.PublicKey, + localFundingKey, remoteFundingKey *btcec.PublicKey, chanID lnwire.ShortChannelID, + chanPoint wire.OutPoint) { - // TODO(roasbeef): need a Signer.SignMessage method to finalize - // advertisements - chanAnnouncement := newChanAnnouncement(idKey, remoteIDKey, channel, chanID, - localProof, remoteProof) + ann, err := f.newChanAnnouncement(localIDKey, remoteIDKey, localFundingKey, + remoteFundingKey, chanID, chanPoint) + if err != nil { + fndgLog.Errorf("can't generate channel announcement: %v", err) + return + } - f.cfg.SendToDiscovery(chanAnnouncement.chanAnn) - f.cfg.SendToDiscovery(chanAnnouncement.edgeUpdate) + fndgLog.Infof("Send channel, channel update, and proof announcements"+ + " for chanID=%v, shortChannelID=%v to discovery service", + chanPoint, chanID.ToUint64()) + f.cfg.SendToDiscovery(ann.chanAnn) + f.cfg.SendToDiscovery(ann.chanUpdateAnn) + f.cfg.SendToDiscovery(ann.chanProof) } // initFundingWorkflow sends a message to the funding manager instructing it diff --git a/peer.go b/peer.go index c348de5c..5783e0af 100644 --- a/peer.go +++ b/peer.go @@ -494,12 +494,15 @@ out: isChanUpdate = true targetChan = msg.ChannelPoint - case *lnwire.NodeAnnouncement, + case *lnwire.ChannelUpdateAnnouncement, *lnwire.ChannelAnnouncement, - *lnwire.ChannelUpdateAnnouncement: + *lnwire.NodeAnnouncement, + *lnwire.AnnounceSignatures: p.server.discoverSrv.ProcessRemoteAnnouncement(msg, p.addr.IdentityKey) + default: + peerLog.Errorf("unknown message received from peer "+"%v", p) } if isChanUpdate { diff --git a/server.go b/server.go index cbc028d6..abafa067 100644 --- a/server.go +++ b/server.go @@ -168,8 +168,23 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, selfAddrs = append(selfAddrs, addr) } + // TODO(roasbeef): remove once we actually sign the funding_locked + // stuffs + fakeSigHex, err := hex.DecodeString("30450221008ce2bc69281ce27da07e66" + + "83571319d18e949ddfa2965fb6caa1bf0314f882d70220299105481d63e0f" + + "4bc2a88121167221b6700d72a0ead154c03be696a292d24ae") + if err != nil { + return nil, err + } + + fakeSig, err := btcec.ParseSignature(fakeSigHex, btcec.S256()) + if err != nil { + return nil, err + } + chanGraph := chanDB.ChannelGraph() self := &channeldb.LightningNode{ + AuthSig: fakeSig, LastUpdate: time.Now(), Addresses: selfAddrs, PubKey: privKey.PubKey(), @@ -202,10 +217,12 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, } s.discoverSrv, err = discovery.New(discovery.Config{ - Broadcast: s.broadcastMessage, - Notifier: s.chainNotifier, - Router: s.chanRouter, - SendMessages: s.sendToPeer, + Broadcast: s.broadcastMessage, + Notifier: s.chainNotifier, + Router: s.chanRouter, + SendToPeer: s.sendToPeer, + TrickleDelay: time.Millisecond * 300, + ProofMatureDelta: 0, }) if err != nil { return nil, err @@ -218,8 +235,16 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, IDKey: s.identityPriv.PubKey(), Wallet: wallet, Notifier: s.chainNotifier, - SendToDiscovery: func(msg lnwire.Message) { - s.discoverSrv.ProcessLocalAnnouncement(msg, + SignNodeKey: func(nodeKey, fundingKey *btcec.PublicKey) (*btcec.Signature, + error) { + return fakeSig, nil + }, + SignAnnouncement: func(msg lnwire.Message) (*btcec.Signature, + error) { + return fakeSig, nil + }, + SendToDiscovery: func(msg lnwire.Message) chan error { + return s.discoverSrv.ProcessLocalAnnouncement(msg, s.identityPriv.PubKey()) }, ArbiterChan: s.breachArbiter.newContracts,