From fbb8f0bccedee6d211bdbdde2bee1780e83acaf4 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 26 Jan 2018 16:11:11 -0800 Subject: [PATCH] htlcswitch/sequencer: adds a persistent emitter of payment IDs --- htlcswitch/sequencer.go | 128 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 htlcswitch/sequencer.go diff --git a/htlcswitch/sequencer.go b/htlcswitch/sequencer.go new file mode 100644 index 00000000..7e128cc9 --- /dev/null +++ b/htlcswitch/sequencer.go @@ -0,0 +1,128 @@ +package htlcswitch + +import ( + "sync" + + "github.com/boltdb/bolt" + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/channeldb" +) + +// defaultSequenceBatchSize specifies the window of sequence numbers that are +// allocated for each write to disk made by the sequencer. +const defaultSequenceBatchSize = 1000 + +// Sequencer emits sequence numbers for locally initiated HTLCs. These are +// only used internally for tracking pending payments, however they must be +// unique in order to avoid circuit key collision in the circuit map. +type Sequencer interface { + // NextID returns a unique sequence number for each invocation. + NextID() (uint64, error) +} + +var ( + // nextPaymentIDKey identifies the bucket that will keep track of the + // persistent sequence numbers for payments. + nextPaymentIDKey = []byte("next-payment-id-key") + + // ErrSequencerCorrupted signals that the persistence engine was not + // initialized, or has been corrupted since startup. + ErrSequencerCorrupted = errors.New( + "sequencer database has been corrupted") +) + +// persistentSequencer is a concrete implementation of IDGenerator, that uses +// channeldb to allocate sequence numbers. +type persistentSequencer struct { + db *channeldb.DB + + mu sync.Mutex + + nextID uint64 + horizonID uint64 +} + +// NewPersistentSequencer initializes a new sequencer using a channeldb backend. +func NewPersistentSequencer(db *channeldb.DB) (Sequencer, error) { + g := &persistentSequencer{ + db: db, + } + + // Ensure the database bucket is created before any updates are + // performed. + if err := g.initDB(); err != nil { + return nil, err + } + + return g, nil +} + +// NextID returns a unique sequence number for every invocation, persisting the +// assignment to avoid reuse. +func (s *persistentSequencer) NextID() (uint64, error) { + + // nextID will be the unique sequence number returned if no errors are + // encountered. + var nextID uint64 + + // If our sequence batch has not been exhausted, we can allocate the + // next identifier in the range. + s.mu.Lock() + defer s.mu.Unlock() + + if s.nextID < s.horizonID { + nextID = s.nextID + s.nextID++ + + return nextID, nil + } + + // Otherwise, our sequence batch has been exhausted. We use the last + // known sequence number on disk to mark the beginning of the next + // sequence batch, and allocate defaultSequenceBatchSize (1000) at a + // time. + // + // NOTE: This also will happen on the first invocation after startup, + // i.e. when nextID and horizonID are both 0. The next sequence batch to be + // allocated will start from the last known tip on disk, which is fine + // as we only require uniqueness of the allocated numbers. + var nextHorizonID uint64 + if err := s.db.Update(func(tx *bolt.Tx) error { + nextIDBkt := tx.Bucket(nextPaymentIDKey) + if nextIDBkt == nil { + return ErrSequencerCorrupted + } + + nextID = nextIDBkt.Sequence() + nextHorizonID = nextID + defaultSequenceBatchSize + + // Cannot fail when used in Update. + nextIDBkt.SetSequence(nextHorizonID) + + return nil + }); err != nil { + return 0, err + } + + // Never assign index zero, to avoid collisions with the EmptyKeystone. + if nextID == 0 { + nextID++ + } + + // If our batch sequence allocation succeed, update our in-memory values + // so we can continue to allocate sequence numbers without hitting disk. + // The nextID is incremented by one in memory so the in can be used + // issued directly on the next invocation. + s.nextID = nextID + 1 + s.horizonID = nextHorizonID + + return nextID, nil +} + +// initDB populates the bucket used to generate payment sequence numbers. +func (s *persistentSequencer) initDB() error { + return s.db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(nextPaymentIDKey) + return err + }) +}