discovery+server: make waiting proofs persistent

In this commit waiting proofs array have been replaced with persistant
boltd storage which removes the possibility for the half proof to be
lost during half proof exchange.
This commit is contained in:
Andrey Samokhvalov 2017-05-05 20:17:31 +03:00 committed by Olaoluwa Osuntokun
parent a3eee453c3
commit 8e4199ee92
4 changed files with 165 additions and 85 deletions

View File

@ -16,17 +16,6 @@ import (
"github.com/roasbeef/btcutil"
)
// waitingProofKey is the proof key which uniquely identifies the announcement
// signature message. The goal of this key is distinguish the local and remote
// proof for the same channel id.
//
// TODO(andrew.shvv) move to the channeldb package after waiting proof map
// becomes persistent.
type waitingProofKey struct {
chanID uint64
isRemote bool
}
// networkMsg couples a routing related wire message with the peer that
// originally sent it.
type networkMsg struct {
@ -79,18 +68,27 @@ type Config struct {
// network the pending batch of new announcements we've received since
// the last trickle tick.
TrickleDelay time.Duration
// DB is a global boltdb instance which is needed to pass it in
// waiting proof storage to make waiting proofs persistent.
DB *channeldb.DB
}
// New creates a new AuthenticatedGossiper instance, initialized with the
// passed configuration paramters.
func New(cfg Config) (*AuthenticatedGossiper, error) {
storage, err := channeldb.NewWaitingProofStore(cfg.DB)
if err != nil {
return nil, err
}
return &AuthenticatedGossiper{
cfg: &cfg,
networkMsgs: make(chan *networkMsg),
quit: make(chan struct{}),
syncRequests: make(chan *syncRequest),
prematureAnnouncements: make(map[uint32][]*networkMsg),
waitingProofs: make(map[waitingProofKey]*lnwire.AnnounceSignatures),
waitingProofs: storage,
}, nil
}
@ -110,7 +108,7 @@ type AuthenticatedGossiper struct {
quit chan struct{}
wg sync.WaitGroup
// cfg is a copy of the configuration struct that the discovery service
// cfg is a copy of the configuration struct that the gossiper service
// was initialized with.
cfg *Config
@ -128,17 +126,15 @@ type AuthenticatedGossiper struct {
// TODO(roasbeef): limit premature networkMsgs to N
prematureAnnouncements map[uint32][]*networkMsg
// waitingProofs is a map of partial channel proof announcement
// messages. We use this map to buffer half of the material needed to
// reconstruct a full authenticated channel announcement. Once we
// receive the other half the channel proof, we'll be able to properly
// validate it an re-broadcast it out to the network.
//
// TODO(andrew.shvv) make this map persistent.
waitingProofs map[waitingProofKey]*lnwire.AnnounceSignatures
// waitingProofs is a persistent storage of partial channel proof
// announcement messages. We use it to buffer half of the material
// needed to reconstruct a full authenticated channel announcement. Once
// we receive the other half the channel proof, we'll be able to
// properly validate it an re-broadcast it out to the network.
waitingProofs *channeldb.WaitingProofStore
// networkMsgs is a channel that carries new network broadcasted
// message from outside the discovery service to be processed by the
// message from outside the gossiper service to be processed by the
// networkHandler.
networkMsgs chan *networkMsg
@ -415,7 +411,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
nodePub, err)
}
// The discovery has been signalled to exit, to we exit our
// The gossiper has been signalled to exit, to we exit our
// main loop so the wait group can be decremented.
case <-d.quit:
return
@ -689,8 +685,15 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
if err != nil {
// TODO(andrew.shvv) this is dangerous because remote
// node might rewrite the waiting proof.
key := newProofKey(shortChanID, nMsg.isRemote)
d.waitingProofs[key] = msg
proof := channeldb.NewWaitingProof(nMsg.isRemote, msg)
if err := d.waitingProofs.Add(proof); err != nil {
err := errors.Errorf("unable to store "+
"the proof for short_chan_id=%v: %v",
shortChanID, err)
log.Error(err)
nMsg.err <- err
return nil
}
log.Infof("Orphan %v proof announcement with "+
"short_chan_id=%v, adding"+
@ -720,11 +723,26 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
// announcement. If we didn't receive the opposite half of the
// proof than we should store it this one, and wait for
// opposite to be received.
oppositeKey := newProofKey(chanInfo.ChannelID, !nMsg.isRemote)
oppositeProof, ok := d.waitingProofs[oppositeKey]
if !ok {
key := newProofKey(chanInfo.ChannelID, nMsg.isRemote)
d.waitingProofs[key] = msg
proof := channeldb.NewWaitingProof(nMsg.isRemote, msg)
oppositeProof, err := d.waitingProofs.Get(proof.OppositeKey())
if err != nil && err != channeldb.ErrWaitingProofNotFound {
err := errors.Errorf("unable to get "+
"the opposite proof for short_chan_id=%v: %v",
shortChanID, err)
log.Error(err)
nMsg.err <- err
return nil
}
if err == channeldb.ErrWaitingProofNotFound {
if err := d.waitingProofs.Add(proof); err != nil {
err := errors.Errorf("unable to store "+
"the proof for short_chan_id=%v: %v",
shortChanID, err)
log.Error(err)
nMsg.err <- err
return nil
}
// If proof was sent by a local sub-system, then we'll
// send the announcement signature to the remote node
@ -805,7 +823,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
nMsg.err <- err
return nil
}
delete(d.waitingProofs, oppositeKey)
if err := d.waitingProofs.Remove(proof.OppositeKey()); err != nil {
err := errors.Errorf("unable remove opposite proof "+
"for the channel with chanID=%v: %v", msg.ChannelID, err)
log.Error(err)
nMsg.err <- err
return nil
}
// Proof was successfully created and now can announce the
// channel to the remain network.

View File

@ -13,6 +13,9 @@ import (
"time"
"io/ioutil"
"os"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
@ -55,6 +58,31 @@ var (
proofMatureDelta uint32
)
// makeTestDB creates a new instance of the ChannelDB for testing purposes. A
// callback which cleans up the created temporary directories is also returned
// and intended to be executed after the test completes.
func makeTestDB() (*channeldb.DB, func(), error) {
// First, create a temporary directory to be used for the duration of
// this test.
tempDirName, err := ioutil.TempDir("", "channeldb")
if err != nil {
return nil, nil, err
}
// Next, create channeldb for the first time.
cdb, err := channeldb.Open(tempDirName)
if err != nil {
return nil, nil, err
}
cleanUp := func() {
cdb.Close()
os.RemoveAll(tempDirName)
}
return cdb, cleanUp, nil
}
type mockSigner struct {
privKey *btcec.PrivateKey
}
@ -369,7 +397,7 @@ func createRemoteChannelAnnouncement(blockHeight uint32) (*lnwire.ChannelAnnounc
}
type testCtx struct {
discovery *AuthenticatedGossiper
gossiper *AuthenticatedGossiper
router *mockGraphSource
notifier *mockNotifier
broadcastedMessage chan lnwire.Message
@ -383,8 +411,13 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
notifier := newMockNotifier()
router := newMockRouter(startHeight)
db, cleanUpDb, err := makeTestDB()
if err != nil {
return nil, nil, err
}
broadcastedMessage := make(chan lnwire.Message, 10)
discovery, err := New(Config{
gossiper, err := New(Config{
Notifier: notifier,
Broadcast: func(_ *btcec.PublicKey, msgs ...lnwire.Message) error {
for _, msg := range msgs {
@ -398,22 +431,26 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
Router: router,
TrickleDelay: trickleDelay,
ProofMatureDelta: proofMatureDelta,
DB: db,
})
if err != nil {
cleanUpDb()
return nil, nil, fmt.Errorf("unable to create router %v", err)
}
if err := discovery.Start(); err != nil {
if err := gossiper.Start(); err != nil {
cleanUpDb()
return nil, nil, fmt.Errorf("unable to start router: %v", err)
}
cleanUp := func() {
discovery.Stop()
gossiper.Stop()
cleanUpDb()
}
return &testCtx{
router: router,
notifier: notifier,
discovery: discovery,
gossiper: gossiper,
broadcastedMessage: broadcastedMessage,
}, cleanUp, nil
}
@ -428,7 +465,7 @@ func TestProcessAnnouncement(t *testing.T) {
defer cleanup()
// Create node valid, signed announcement, process it with with
// discovery service, check that valid announcement have been
// gossiper service, check that valid announcement have been
// propagated farther into the lightning network, and check that we
// added new node into router.
na, err := createNodeAnnouncement(nodeKeyPriv1)
@ -436,7 +473,7 @@ func TestProcessAnnouncement(t *testing.T) {
t.Fatalf("can't create node announcement: %v", err)
}
err = <-ctx.discovery.ProcessRemoteAnnouncement(na, na.NodeID)
err = <-ctx.gossiper.ProcessRemoteAnnouncement(na, na.NodeID)
if err != nil {
t.Fatalf("can't process remote announcement: %v", err)
}
@ -459,7 +496,7 @@ func TestProcessAnnouncement(t *testing.T) {
t.Fatalf("can't create channel announcement: %v", err)
}
err = <-ctx.discovery.ProcessRemoteAnnouncement(ca, na.NodeID)
err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, na.NodeID)
if err != nil {
t.Fatalf("can't process remote announcement: %v", err)
}
@ -482,7 +519,7 @@ func TestProcessAnnouncement(t *testing.T) {
t.Fatalf("can't create update announcement: %v", err)
}
err = <-ctx.discovery.ProcessRemoteAnnouncement(ua, na.NodeID)
err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, na.NodeID)
if err != nil {
t.Fatalf("can't process remote announcement: %v", err)
}
@ -523,7 +560,7 @@ func TestPrematureAnnouncement(t *testing.T) {
}
select {
case <-ctx.discovery.ProcessRemoteAnnouncement(ca, na.NodeID):
case <-ctx.gossiper.ProcessRemoteAnnouncement(ca, na.NodeID):
t.Fatal("announcement was proceeded")
case <-time.After(100 * time.Millisecond):
}
@ -542,7 +579,7 @@ func TestPrematureAnnouncement(t *testing.T) {
}
select {
case <-ctx.discovery.ProcessRemoteAnnouncement(ua, na.NodeID):
case <-ctx.gossiper.ProcessRemoteAnnouncement(ua, na.NodeID):
t.Fatal("announcement was proceeded")
case <-time.After(100 * time.Millisecond):
}
@ -596,7 +633,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
// Recreate lightning network topology. Initialize router with channel
// between two nodes.
err = <-ctx.discovery.ProcessLocalAnnouncement(batch.localChanAnn, localKey)
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
@ -606,7 +643,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
case <-time.After(2 * trickleDelay):
}
err = <-ctx.discovery.ProcessLocalAnnouncement(batch.chanUpdAnn, localKey)
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
@ -616,7 +653,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
case <-time.After(2 * trickleDelay):
}
err = <-ctx.discovery.ProcessRemoteAnnouncement(batch.chanUpdAnn, remoteKey)
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn, remoteKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
@ -628,7 +665,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
// Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process.
err = <-ctx.discovery.ProcessLocalAnnouncement(batch.localProofAnn, localKey)
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
@ -639,11 +676,21 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
case <-time.After(2 * trickleDelay):
}
if len(ctx.discovery.waitingProofs) != 1 {
t.Fatal("local proof announcement should be stored")
number := 0
if err := ctx.gossiper.waitingProofs.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
); err != nil {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
err = <-ctx.discovery.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey)
if number != 1 {
t.Fatal("wrong number of objects in storage")
}
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
@ -655,6 +702,20 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
t.Fatal("announcement wasn't broadcast")
}
}
number = 0
if err := ctx.gossiper.waitingProofs.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
); err != nil && err != channeldb.ErrWaitingProofNotFound {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 0 {
t.Fatal("waiting proof should be removed from storage")
}
}
// TestOrphanSignatureAnnouncement ensures that the gossiper properly
@ -678,18 +739,28 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
// manager, thereby kick off the announcement exchange process, in
// this case the announcement should be added in the orphan batch
// because we haven't announce the channel yet.
err = <-ctx.discovery.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey)
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey)
if err != nil {
t.Fatalf("unable to proceed announcement: %v", err)
}
if len(ctx.discovery.waitingProofs) != 1 {
t.Fatal("local proof announcement should be stored")
number := 0
if err := ctx.gossiper.waitingProofs.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
); err != nil {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 1 {
t.Fatal("wrong number of objects in storage")
}
// Recreate lightning network topology. Initialize router with channel
// between two nodes.
err = <-ctx.discovery.ProcessLocalAnnouncement(batch.localChanAnn, localKey)
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
@ -700,7 +771,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
case <-time.After(2 * trickleDelay):
}
err = <-ctx.discovery.ProcessLocalAnnouncement(batch.chanUpdAnn, localKey)
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
@ -710,7 +781,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
case <-time.After(2 * trickleDelay):
}
err = <-ctx.discovery.ProcessRemoteAnnouncement(batch.chanUpdAnn, remoteKey)
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn, remoteKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
@ -722,7 +793,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
// After that we process local announcement, and waiting to receive
// the channel announcement.
err = <-ctx.discovery.ProcessLocalAnnouncement(batch.localProofAnn, localKey)
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
@ -735,7 +806,17 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
}
}
if len(ctx.discovery.waitingProofs) != 0 {
t.Fatal("index should be removed")
number = 0
if err := ctx.gossiper.waitingProofs.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
); err != nil {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 0 {
t.Fatal("wrong number of objects in storage")
}
}

View File

@ -1,8 +1,6 @@
package discovery
import (
"encoding/binary"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet"
@ -10,31 +8,6 @@ import (
"github.com/roasbeef/btcd/btcec"
)
// newProofKey constructs a new announcement signature message key.
func newProofKey(chanID uint64, isRemote bool) waitingProofKey {
return waitingProofKey{
chanID: chanID,
isRemote: isRemote,
}
}
// ToBytes returns a serialized representation of the key.
func (k waitingProofKey) ToBytes() []byte {
var key [9]byte
var b uint8
if k.isRemote {
b = 0
} else {
b = 1
}
binary.BigEndian.PutUint64(key[:8], k.chanID)
key[8] = b
return key[:]
}
// createChanAnnouncement is a helper function which creates all channel
// announcements given the necessary channel related database items. This
// function is used to transform out databse structs into the coresponding wire

View File

@ -254,6 +254,7 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
SendToPeer: s.sendToPeer,
TrickleDelay: time.Millisecond * 300,
ProofMatureDelta: 0,
DB: chanDB,
})
if err != nil {
return nil, err