diff --git a/nursery_store.go b/nursery_store.go index f346a831..3fdc5598 100644 --- a/nursery_store.go +++ b/nursery_store.go @@ -16,22 +16,11 @@ import ( // its contents, and persisting state transitions detected by the utxo nursery. type NurseryStore interface { - // Incubation Entry Points. - - // EnterCrib accepts a new htlc output that the nursery will incubate - // through its two-stage process of sweeping funds back to the user's - // wallet. These outputs are persisted in the nursery store's crib - // bucket, and will be revisited after the output's CLTV has expired. - EnterCrib(*babyOutput) error - - // EnterPreschool accepts a new commitment output that the nursery will - // incubate through a single stage before sweeping. Outputs are stored - // in the preschool bucket until the commitment transaction has been - // confirmed, at which point they will be moved to the kindergarten - // bucket. - EnterPreschool(*kidOutput) error - - // On-chain Driven State Transtitions. + // Incubate registers a commitment output and a slice of htlc outputs to + // be swept back into the user's wallet. The event is persisted to disk, + // such that the nursery can resume the incubation process after a + // potential crash. + Incubate(*kidOutput, []babyOutput) error // CribToKinder atomically moves a babyOutput in the crib bucket to the // kindergarten bucket. The now mature kidOutput contained in the @@ -44,40 +33,34 @@ type NurseryStore interface { // transaction. PreschoolToKinder(*kidOutput) error - // AwardDiplomas accepts a variadic number of kidOutputs from the - // kindergarten bucket, and removes their corresponding entries from the - // height and channel indexes. If this method detects that all outputs - // for a particular contract have been incubated, it returns the channel + // GraduateKinder accepts a slice of kidOutputs from the kindergarten + // bucket, and removes their corresponding entries from the height and + // channel indexes. If this method detects that all outputs for a + // particular contract have been incubated, it returns the channel // points that are ready to be marked as fully closed. // TODO: make this handle one output at a time? - AwardDiplomas(...kidOutput) ([]wire.OutPoint, error) + GraduateKinder([]kidOutput) error - IsMatureChannel(*wire.OutPoint) (bool, error) - - // TryFinalizeClass accepts a block height as a parameter and purges its + // FinalizeHeight accepts a block height as a parameter and purges its // persistent state for all outputs at that height. During a restart, // the utxo nursery will begin it's recovery procedure from the next // height that has yet to be finalized. This block height should lag // beyond the best height for this chain as a measure of reorg // protection. - TryFinalizeClass(height uint32) error + FinalizeHeight(height uint32) error - // State Bucket Enumeration. + // LastFinalizedHeight returns the last block height for which the + // nursery store has purged all persistent state. + LastFinalizedHeight() (uint32, error) - // FetchCribs returns a list of babyOutputs in the crib bucket whose + // FetchClass returns a list of babyOutputs in the crib bucket whose // CLTV delay expires at the provided block height. - FetchCribs(height uint32) ([]babyOutput, error) - - // FetchKindergartens returns a list of kidOutputs in the kindergarten - // bucket whose CSV delay expires at the provided block height. - FetchKindergartens(height uint32) ([]kidOutput, error) + FetchClass(height uint32) ([]kidOutput, []babyOutput, error) // FetchPreschools returns a list of all outputs currently stored in the // preschool bucket. FetchPreschools() ([]kidOutput, error) - // Channel Output Enumeration. - // ForChanOutputs iterates over all outputs being incubated for a // particular channel point. This method accepts a callback that allows // the caller to process each key-value pair. The key will be a prefixed @@ -85,13 +68,13 @@ type NurseryStore interface { // whose type should be inferred from the key's prefix. ForChanOutputs(*wire.OutPoint, func([]byte, []byte) error) error + // IsMatureChannel determines the whether or not all of the outputs in a + // particular channel bucket have been marked as graduated. + IsMatureChannel(*wire.OutPoint) (bool, error) + + // RemoveChannel channel erases all entries from the channel bucket for + // the provided channel point. RemoveChannel(*wire.OutPoint) error - - // The Point of No Return. - - // LastFinalizedHeight returns the last block height for which the - // nursery store has purged all persistent state. - LastFinalizedHeight() (uint32, error) } // prefixChainKey creates the root level keys for the nursery store. The keys @@ -169,6 +152,10 @@ var ( // timelock. kndrPrefix = []byte("kndr") + // gradPrefix is the state prefix given to all outputs that have been + // completely incubated. Once all outputs have been marked as graduated, + // this serves as a persistent marker that the nursery should mark the + // channel fully closed in the channeldb. gradPrefix = []byte("grad") ) @@ -197,7 +184,7 @@ var ( // | // | The channel index contains a directory for each channel that has a // | non-zero number of outputs being tracked by the nursery store. -// | Inside each channel directory are files contains serialized spendable +// | Inside each channel directory are files containing serialized spendable // | outputs that are awaiting some state transition. The name of each file // | contains the outpoint of the spendable output in the file, and is // | prefixed with 4-byte state prefix, indicating whether the spendable @@ -206,12 +193,12 @@ var ( // | which is useful in constructing nursery reports. // | // ├── channel-index-key/ -// │   ├── / <- CHANNEL BUCKET +// │   ├── / <- CHANNEL BUCKET // | |   ├── : // | |   └── : -// │   ├── / +// │   ├── / // | |   └── : -// │   └── / +// │   └── / // |    ├── : // |    └── : // | @@ -219,13 +206,12 @@ var ( // | // | The height index contains a directory for each height at which the // | nursery still has uncompleted actions. If an output is a crib or -// | kindergarten output, -// | it will have an associated entry in the height index. Inside a -// | particular height directory, the structure is similar to that of the -// | channel index, containing multiple channel directories, each of which -// | contains subdirectories named with a prefixed outpoint belonging to -// | the channel. Enumerating these combinations yields a relative file -// | path: +// | kindergarten output, it will have an associated entry in the height +// | index. Inside a particular height directory, the structure is similar +// | to that of the channel index, containing multiple channel directories, +// | each of which contains subdirectories named with a prefixed outpoint +// | belonging to the channel. Enumerating these combinations yields a +// | relative file path: // | e.g. // // | that can be queried in the channel index to retrieve the serialized // | output. @@ -233,22 +219,22 @@ var ( // └── height-index-key/ //    ├── / <- HEIGHT BUCKET // |   └── / <- HEIGHT-CHANNEL BUCKET -// | |    ├── / <- PREFIXED OUTPOINT -// | |    └── / +// | |    ├── : "" <- PREFIXED OUTPOINT +// | |    └── : "" // |   └── / -// |    └── / +// |    └── : "" //    └── / //    └── / -//    └── / -//    └── / +//    └── : "" +//    └── : "" // nurseryStore is a concrete instantiation of a NurseryStore that is backed by // a channeldb.DB instance. type nurseryStore struct { - chainHash chainhash.Hash - pfxChainKey []byte + chainHash chainhash.Hash + db *channeldb.DB - db *channeldb.DB + pfxChainKey []byte } // newNurseryStore accepts a chain hash and a channeldb.DB instance, returning @@ -267,105 +253,33 @@ func newNurseryStore(chainHash *chainhash.Hash, return &nurseryStore{ chainHash: *chainHash, - pfxChainKey: pfxChainKey, db: db, + pfxChainKey: pfxChainKey, }, nil } -// Incubation Entry Points. - -// EnterCrib accepts a new htlc output that the nursery will incubate through -// its two-stage process of sweeping funds back to the user's wallet. These -// outputs are persisted in the nursery store's crib bucket, and will be -// revisited after the output's CLTV has expired. -func (ns *nurseryStore) EnterCrib(bby *babyOutput) error { +// Incubate initiates the incubation process for the CSV-delayed commitment +// output and any number of CLTV-delayed htlc outputs. +func (ns *nurseryStore) Incubate(kid *kidOutput, babies []babyOutput) error { return ns.db.Update(func(tx *bolt.Tx) error { - - // First, retrieve or create the channel bucket corresponding to - // the baby output's origin channel point. - chanPoint := bby.OriginChanPoint() - chanBucket, err := ns.createChannelBucket(tx, chanPoint) - if err != nil { - return err + // Store commitment output in preschool bucket if not nil. + if kid != nil { + if err := ns.enterPreschool(tx, kid); err != nil { + return err + } } - // Next, retrieve or create the height-channel bucket located in - // the height bucket corresponding to the baby output's CLTV - // expiry height. - hghtChanBucket, err := ns.createHeightChanBucket(tx, - bby.expiry, chanPoint) - if err != nil { - return err + // Add all htlc outputs to the crib bucket. + for _, baby := range babies { + if err := ns.enterCrib(tx, &baby); err != nil { + return err + } } - // Since we are inserting this output into the crib bucket, we - // create a key that prefixes the baby output's outpoint with - // the crib prefix. - pfxOutputKey, err := prefixOutputKey(cribPrefix, bby.OutPoint()) - if err != nil { - return err - } - - // Serialize the baby output so that it can be written to the - // underlying key-value store. - var babyBuffer bytes.Buffer - if err := bby.Encode(&babyBuffer); err != nil { - return err - } - babyBytes := babyBuffer.Bytes() - - // Now, insert the serialized output into its channel bucket - // under the prefixed key created above. - if err := chanBucket.Put(pfxOutputKey, babyBytes); err != nil { - return err - } - - // Finally, create a corresponding bucket in the height-channel - // bucket for this crib output. The existence of this bucket - // indicates that the serialized output can be retrieved from - // the channel bucket using the same prefix key. - _, err = hghtChanBucket.CreateBucketIfNotExists(pfxOutputKey) - - return err + return nil }) } -// EnterPreschool accepts a new commitment output that the nursery will -// incubate through a single stage before sweeping. Outputs are stored in the -// preschool bucket until the commitment transaction has been confirmed, at -// which point they will be moved to the kindergarten bucket. -func (ns *nurseryStore) EnterPreschool(kid *kidOutput) error { - return ns.db.Update(func(tx *bolt.Tx) error { - - // First, retrieve or create the channel bucket corresponding to - // the baby output's origin channel point. - chanPoint := kid.OriginChanPoint() - chanBucket, err := ns.createChannelBucket(tx, chanPoint) - if err != nil { - return err - } - - // Since the babyOutput is being inserted into the preschool - // bucket, we create a key that prefixes its outpoint with the - // preschool prefix. - pfxOutputKey, err := prefixOutputKey(psclPrefix, kid.OutPoint()) - if err != nil { - return err - } - - // Serialize the kidOutput and insert it into the channel - // bucket. - var kidBuffer bytes.Buffer - if err := kid.Encode(&kidBuffer); err != nil { - return err - } - - return chanBucket.Put(pfxOutputKey, kidBuffer.Bytes()) - }) -} - -// On-chain Drive State Transitions. - // CribToKinder atomically moves a babyOutput in the crib bucket to the // kindergarten bucket. The now mature kidOutput contained in the babyOutput // will be stored as it waits out the kidOutput's CSV delay. @@ -406,10 +320,22 @@ func (ns *nurseryStore) CribToKinder(bby *babyOutput) error { // We successfully located an existing height chan // bucket at this babyOutput's expiry height, proceed by // removing it from the index. - err := hghtChanBucketCltv.DeleteBucket(pfxOutputKey) + err := hghtChanBucketCltv.Delete(pfxOutputKey) if err != nil { return err } + + // Since we removed a crib output from the height index, + // we opportunistically prune the height bucket + // corresponding to the babyOutput's CLTV delay. This + // allows us to clean up any persistent state as outputs + // are progressed through the incubation process. + pruned, err := ns.pruneHeight(tx, bby.expiry) + if err != nil && err != ErrBucketNotEmpty { + return err + } else if err == nil && pruned { + utxnLog.Infof("Height bucket %d pruned", bby.expiry) + } } // Since we are moving this output from the crib bucket to the @@ -449,25 +375,7 @@ func (ns *nurseryStore) CribToKinder(bby *babyOutput) error { // height-channel bucket corresponding to its maturity height. // This informs the utxo nursery that it should attempt to spend // this output when the blockchain reaches the maturity height. - _, err = hghtChanBucketCsv.CreateBucketIfNotExists(pfxOutputKey) - if err != nil { - return err - } - - // Finally, since we removed a crib output from the height - // index, we opportunistically prune the height bucket - // corresponding to the babyOutput's CLTV delay. This allows us - // to clean up any persistent state as outputs are progressed - // through the incubation process. - err = ns.pruneHeight(tx, bby.expiry) - switch err { - case nil, ErrBucketDoesNotExist: - return nil - case ErrBucketNotEmpty: - return nil - default: - return err - } + return hghtChanBucketCsv.Put(pfxOutputKey, []byte{}) }) } @@ -550,7 +458,7 @@ func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput) error { }) } -// AwardDiplomas accepts a list of kidOutputs in the kindergarten bucket, +// GraduateKinder accepts a list of kidOutputs in the kindergarten bucket, // removing their corresponding entries from the height and channel indexes. // If this method detects that all outputs for a particular contract have been // incubated, it returns the channel points that are ready to be marked as @@ -560,16 +468,8 @@ func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput) error { // empty. // 2) Prune the channel bucket belonging to the kid's origin channel point, if // it is empty. -func (ns *nurseryStore) AwardDiplomas( - kids ...kidOutput) ([]wire.OutPoint, error) { +func (ns *nurseryStore) GraduateKinder(kids []kidOutput) error { - // As we iterate over the kids, we will build a list of the channels - // which have been pruned entirely from the nursery store. We will - // return this list to the caller, the utxo nursery, so that it can - // proceed to mark the channels as closed. - // TODO(conner): write list of closed channels to separate bucket so - // that they can be replayed on restart? - var possibleCloseSet = make(map[wire.OutPoint]struct{}) if err := ns.db.Update(func(tx *bolt.Tx) error { for _, kid := range kids { @@ -578,40 +478,29 @@ func (ns *nurseryStore) AwardDiplomas( outpoint := kid.OutPoint() chanPoint := kid.OriginChanPoint() - // Remove output from kindergarten bucket. - + // Construct the key under which the output is currently + // stored height and channel indexes. pfxOutputKey, err := prefixOutputKey(kndrPrefix, outpoint) if err != nil { return err } + // Load the height-channel bucket, remove this output, + // and attempt to prune the bucket if it empty. hghtChanBucket := ns.getHeightChanBucket(tx, confHeight, chanPoint) if hghtChanBucket != nil { - if err := hghtChanBucket.DeleteBucket(pfxOutputKey); err != nil { + if err := hghtChanBucket.Delete(pfxOutputKey); err != nil { return err } // Attempt to prune the height bucket matching the kid // output's confirmation height if it contains no active // outputs. - err := ns.pruneHeight(tx, confHeight) - switch err { - case ErrBucketNotEmpty: - // Bucket still has active outputs, proceed to - // prune channel bucket. - - case ErrBucketDoesNotExist: - // Bucket was previously pruned by another - // graduating output. - - case nil: - // Bucket was pruned successfully and no errors - // were encounter. - utxnLog.Infof("Height bucket %d pruned", confHeight) - - default: - // Unexpected database error. + pruned, err := ns.pruneHeight(tx, confHeight) + if err != nil && err != ErrBucketNotEmpty { return err + } else if err == nil && pruned { + utxnLog.Infof("Height bucket %d pruned", confHeight) } } @@ -620,6 +509,7 @@ func (ns *nurseryStore) AwardDiplomas( return err } + // Remove previous output with kindergarten prefix. if err := chanBucket.Delete(pfxOutputKey); err != nil { return err } @@ -632,104 +522,75 @@ func (ns *nurseryStore) AwardDiplomas( return err } + // Insert serialized output into channel bucket using + // kindergarten-prefixed key. err = chanBucket.Put(pfxOutputKey, gradBuffer.Bytes()) if err != nil { return err } - // If we've arrived here, we have encountered no - // database errors and a bucket was either successfully - // pruned or already has been. Thus it is safe to add it - // to our set of closed channels to be closed, since - // these may need to be replayed to ensure the channel - // database is aware that incubation has completed. - possibleCloseSet[*chanPoint] = struct{}{} } return nil }); err != nil { - return nil, err + return err } - // Convert our set of channels to be closed into a list. - possibleCloses := make([]wire.OutPoint, 0, len(possibleCloseSet)) - for chanPoint := range possibleCloseSet { - possibleCloses = append(possibleCloses, chanPoint) - } - - utxnLog.Infof("Possible channels to be marked fully closed: %v", - possibleCloses) - - return possibleCloses, nil + return nil } -// TryFinalizeClass accepts a block height as a parameter and purges its +// FinalizeHeight accepts a block height as a parameter and purges its // persistent state for all outputs at that height. During a restart, the utxo // nursery will begin it's recovery procedure from the next height that has // yet to be finalized. -func (ns *nurseryStore) TryFinalizeClass(height uint32) error { +func (ns *nurseryStore) FinalizeHeight(height uint32) error { return ns.db.Update(func(tx *bolt.Tx) error { - utxnLog.Infof("Attempting to finalize class at height %v", height) - lastHeight, err := ns.getNextLastFinalizedHeight(tx, height) - if err != nil { + if err := ns.deleteHeightBucket(tx, height); err != nil { return err } - utxnLog.Infof("Finalizing class at height %v", lastHeight) - return ns.putLastFinalizedHeight(tx, lastHeight) + return ns.putLastFinalizedHeight(tx, height) }) } -// State Bucket Enumeration. - -// FetchCribs returns a list of babyOutputs in the crib bucket whose CLTV +// FetchClass returns a list of babyOutputs in the crib bucket whose CLTV // delay expires at the provided block height. -func (ns *nurseryStore) FetchCribs(height uint32) ([]babyOutput, error) { - // Construct a list of all babyOutputs that need TLC at the provided - // block height. +func (ns *nurseryStore) FetchClass(height uint32) ([]kidOutput, []babyOutput, error) { + // Construct list of all crib and kindergarten outputs that need TLC at + // the provided block height. + var kids []kidOutput var babies []babyOutput - if err := ns.forEachHeightPrefix(cribPrefix, height, - func(buf []byte) error { + if err := ns.db.View(func(tx *bolt.Tx) error { + if err := ns.forEachHeightPrefix(tx, cribPrefix, height, func(buf []byte) error { - // We will attempt to deserialize all outputs stored - // with the crib prefix into babyOutputs, since this is - // the expected type that would have been serialized - // previously. - var bby babyOutput - if err := bby.Decode(bytes.NewReader(buf)); err != nil { + // We will attempt to deserialize all outputs + // stored with the crib prefix into babyOutputs, + // since this is the expected type that would + // have been serialized previously. + var baby babyOutput + babyReader := bytes.NewReader(buf) + if err := baby.Decode(babyReader); err != nil { return err } // Append the deserialized object to our list of // babyOutputs. - babies = append(babies, bby) + babies = append(babies, baby) return nil }); err != nil { - return nil, err - } - - return babies, nil -} - -// FetchKindergartens returns a list of kidOutputs in the kindergarten bucket -// whose CSV delay expires at the provided block height. -func (ns *nurseryStore) FetchKindergartens(height uint32) ([]kidOutput, error) { - // Construct a list of all kidOutputs that mature at the provided block - // height. - utxnLog.Infof("Fetching kinders") - var kids []kidOutput - if err := ns.forEachHeightPrefix(kndrPrefix, height, - func(buf []byte) error { - utxnLog.Infof("Inside kinder") + return err + } + return ns.forEachHeightPrefix(tx, kndrPrefix, height, func(buf []byte) error { // We will attempt to deserialize all outputs stored // with the kindergarten prefix into kidOutputs, since // this is the expected type that would have been // serialized previously. var kid kidOutput - if err := kid.Decode(bytes.NewReader(buf)); err != nil { + kidReader := bytes.NewReader(buf) + if err := kid.Decode(kidReader); err != nil { return err } @@ -739,12 +600,13 @@ func (ns *nurseryStore) FetchKindergartens(height uint32) ([]kidOutput, error) { return nil - }); err != nil { - return nil, err - } - utxnLog.Infof("Returning kinders") + }) - return kids, nil + }); err != nil { + return nil, nil, err + } + + return kids, babies, nil } // FetchPreschools returns a list of all outputs currently stored in the @@ -827,8 +689,6 @@ func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) { return kids, nil } -// Channel Output Enumberation. - // ForChanOutputs iterates over all outputs being incubated for a particular // channel point. This method accepts a callback that allows the caller to // process each key-value pair. The key will be a prefixed outpoint, and the @@ -843,150 +703,158 @@ func (ns *nurseryStore) ForChanOutputs(chanPoint *wire.OutPoint, return ns.forChanOutputs(tx, chanPoint, callback) }) } -func (ns *nurseryStore) forChanOutputs(tx *bolt.Tx, chanPoint *wire.OutPoint, - callback func([]byte, []byte) error) error { - chanBucket := ns.getChannelBucket(tx, chanPoint) - if chanBucket == nil { - return ErrContractNotFound +// errImmatureChannel signals that not all outputs in a channel bucket have +// graduated. +var errImmatureChannel = errors.New("channel has non-graduated outputs") + +// IsMatureChannel determines the whether or not all of the outputs in a +// particular channel bucket have been marked as graduated. +func (ns *nurseryStore) IsMatureChannel(chanPoint *wire.OutPoint) (bool, error) { + if err := ns.db.View(func(tx *bolt.Tx) error { + // Iterate over the contents of the channel bucket, computing + // both total number of outputs, and those that have the grad + // prefix. + return ns.forChanOutputs(tx, chanPoint, func(pfxKey, _ []byte) error { + if string(pfxKey[:4]) != string(gradPrefix) { + utxnLog.Infof("Found non-graduated output: %x", pfxKey) + return errImmatureChannel + } + return nil + }) + + }); err != nil && err != errImmatureChannel { + return false, err + } else { + return err == nil, nil } - return chanBucket.ForEach(callback) } -func (ns *nurseryStore) IsMatureChannel(chanPoint *wire.OutPoint) (bool, error) { - var isMature bool - if err := ns.db.View(func(tx *bolt.Tx) error { - var nOutputs, nGrads int - if err := ns.forChanOutputs(tx, chanPoint, func(pfxKey, _ []byte) error { - // Count total and number of grad outputs - if string(pfxKey[:4]) == string(gradPrefix) { - nGrads++ - } - nOutputs++ - +// RemoveChannel channel erases all entries from the channel bucket for the +// provided channel point. +func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error { + return ns.db.Update(func(tx *bolt.Tx) error { + // Retrieve the existing chain bucket for this nursery store. + chainBucket := tx.Bucket(ns.pfxChainKey) + if chainBucket == nil { return nil + } - }); err != nil { + // Retrieve the channel index stored in the chain bucket. + chanIndex := chainBucket.Bucket(channelIndexKey) + if chanIndex == nil { + return nil + } + + // Serialize the provided channel point, such that we can delete + // the mature channel bucket. + var chanBuffer bytes.Buffer + if err := writeOutpoint(&chanBuffer, chanPoint); err != nil { return err } - utxnLog.Infof("Found %d graduated outputs out of %d", nGrads, nOutputs) - // Channel is mature if all outputs are graduated. - if nGrads == nOutputs { - isMature = true + err := chanIndex.DeleteBucket(chanBuffer.Bytes()) + if err != nil { + return err } return nil - }); err != nil { - return false, err - } - - return isMature, nil + }) } -// The Point of No Return. - // LastFinalizedHeight returns the last block height for which the nursery // store has purged all persistent state. This occurs after a fixed interval // for reorg safety. func (ns *nurseryStore) LastFinalizedHeight() (uint32, error) { var lastFinalizedHeight uint32 err := ns.db.View(func(tx *bolt.Tx) error { - lastHeight, err := ns.getLastFinalizedHeight(tx) - if err != nil { - return err - } - - lastFinalizedHeight = lastHeight - - return nil + var err error + lastFinalizedHeight, err = ns.getLastFinalizedHeight(tx) + return err }) + return lastFinalizedHeight, err } -// getLastFinalizedHeight is a helper method that retrieves the last height for -// which the database finalized its persistent state. -func (ns *nurseryStore) getLastFinalizedHeight(tx *bolt.Tx) (uint32, error) { - // Retrieve the chain bucket associated with the given nursery store. - chainBucket := tx.Bucket(ns.pfxChainKey) - if chainBucket == nil { - return 0, nil - } +// Helper Methods - // Lookup the last finalized height in the top-level chain bucket. - heightBytes := chainBucket.Get(lastFinalizedHeightKey) - - // If the resulting bytes are not sized like a uint32, then we have - // never finalized, so we return 0. - if len(heightBytes) != 4 { - return 0, nil - } - - // Otherwise, parse the bytes and return the last finalized height. - return byteOrder.Uint32(heightBytes), nil -} - -func (ns *nurseryStore) getNextLastFinalizedHeight(tx *bolt.Tx, height uint32) (uint32, error) { - // Retrieve the previous last finalized height. - lastHeight, err := ns.getLastFinalizedHeight(tx) - if err != nil { - return 0, err - } - - // TODO(conner): add lower bound after which all state is purged. - - // Retrieve the existing chain bucket for this nursery store. - chainBucket := tx.Bucket(ns.pfxChainKey) - if chainBucket == nil { - return lastHeight, nil - } - - // Retrieve the existing height index. - hghtIndex := chainBucket.Bucket(heightIndexKey) - if hghtIndex == nil { - return lastHeight, nil - } - - for curHeight := lastHeight + 1; curHeight <= height; curHeight++ { - var curHeightBytes [4]byte - byteOrder.PutUint32(curHeightBytes[:], curHeight) - - hghtBucket := hghtIndex.Bucket(curHeightBytes[:]) - if hghtBucket == nil { - continue - } - - nChildren, err := ns.numChildrenInBucket(hghtBucket) - if err != nil { - return 0, err - } - - if nChildren > 0 { - return curHeight - 1, nil - } - } - - return height, nil -} - -// pubLastFinalizedHeight is a helper method that writes the provided height -// under the last finalized height key. -func (ns *nurseryStore) putLastFinalizedHeight(tx *bolt.Tx, - height uint32) error { - - // Ensure that the chain bucket for this nursery store exists. - chainBucket, err := tx.CreateBucketIfNotExists(ns.pfxChainKey) +// enterCrib accepts a new htlc output that the nursery will incubate through +// its two-stage process of sweeping funds back to the user's wallet. These +// outputs are persisted in the nursery store's crib bucket, and will be +// revisited after the output's CLTV has expired. +func (ns *nurseryStore) enterCrib(tx *bolt.Tx, baby *babyOutput) error { + // First, retrieve or create the channel bucket corresponding to the + // baby output's origin channel point. + chanPoint := baby.OriginChanPoint() + chanBucket, err := ns.createChannelBucket(tx, chanPoint) if err != nil { return err } - // Serialize the provided last-finalized height, and store it in the - // top-level chain bucket for this nursery store. - var lastHeightBytes [4]byte - byteOrder.PutUint32(lastHeightBytes[:], height) + // Next, retrieve or create the height-channel bucket located in the + // height bucket corresponding to the baby output's CLTV expiry height. + hghtChanBucket, err := ns.createHeightChanBucket(tx, + baby.expiry, chanPoint) + if err != nil { + return err + } - return chainBucket.Put(lastFinalizedHeightKey, lastHeightBytes[:]) + // Since we are inserting this output into the crib bucket, we create a + // key that prefixes the baby output's outpoint with the crib prefix. + pfxOutputKey, err := prefixOutputKey(cribPrefix, baby.OutPoint()) + if err != nil { + return err + } + + // Serialize the baby output so that it can be written to the underlying + // key-value store. + var babyBuffer bytes.Buffer + if err := baby.Encode(&babyBuffer); err != nil { + return err + } + babyBytes := babyBuffer.Bytes() + + // Now, insert the serialized output into its channel bucket under the + // prefixed key created above. + if err := chanBucket.Put(pfxOutputKey, babyBytes); err != nil { + return err + } + + // Finally, create a corresponding bucket in the height-channel bucket + // for this crib output. The existence of this bucket indicates that the + // serialized output can be retrieved from the channel bucket using the + // same prefix key. + return hghtChanBucket.Put(pfxOutputKey, []byte{}) +} + +// enterPreschool accepts a new commitment output that the nursery will incubate +// through a single stage before sweeping. Outputs are stored in the preschool +// bucket until the commitment transaction has been confirmed, at which point +// they will be moved to the kindergarten bucket. +func (ns *nurseryStore) enterPreschool(tx *bolt.Tx, kid *kidOutput) error { + // First, retrieve or create the channel bucket corresponding to the + // baby output's origin channel point. + chanPoint := kid.OriginChanPoint() + chanBucket, err := ns.createChannelBucket(tx, chanPoint) + if err != nil { + return err + } + + // Since the babyOutput is being inserted into the preschool bucket, we + // create a key that prefixes its outpoint with the preschool prefix. + pfxOutputKey, err := prefixOutputKey(psclPrefix, kid.OutPoint()) + if err != nil { + return err + } + + // Serialize the kidOutput and insert it into the channel bucket. + var kidBuffer bytes.Buffer + if err := kid.Encode(&kidBuffer); err != nil { + return err + } + + return chanBucket.Put(pfxOutputKey, kidBuffer.Bytes()) } // createChannelBucket creates or retrieves a channel bucket for the provided @@ -1074,22 +942,22 @@ func (ns *nurseryStore) createHeightBucket(tx *bolt.Tx, return hghtIndex.CreateBucketIfNotExists(heightBytes[:]) } -// getHeightBucket retrieves an existing height bucket from the nursery store, +// getHeightBucketPath retrieves an existing height bucket from the nursery store, // using the provided block height. If the bucket does not exist, or any bucket // along its path does not exist, a nil value is returned. -func (ns *nurseryStore) getHeightBucket(tx *bolt.Tx, - height uint32) *bolt.Bucket { +func (ns *nurseryStore) getHeightBucketPath(tx *bolt.Tx, + height uint32) (*bolt.Bucket, *bolt.Bucket, *bolt.Bucket) { // Retrieve the existing chain bucket for this nursery store. chainBucket := tx.Bucket(ns.pfxChainKey) if chainBucket == nil { - return nil + return nil, nil, nil } // Retrieve the existing channel index. hghtIndex := chainBucket.Bucket(heightIndexKey) if hghtIndex == nil { - return nil + return nil, nil, nil } // Serialize the provided block height and return the bucket matching @@ -1097,7 +965,47 @@ func (ns *nurseryStore) getHeightBucket(tx *bolt.Tx, var heightBytes [4]byte byteOrder.PutUint32(heightBytes[:], height) - return hghtIndex.Bucket(heightBytes[:]) + return chainBucket, hghtIndex, hghtIndex.Bucket(heightBytes[:]) +} + +// getHeightBucket retrieves an existing height bucket from the nursery store, +// using the provided block height. If the bucket does not exist, or any bucket +// along its path does not exist, a nil value is returned. +func (ns *nurseryStore) getHeightBucket(tx *bolt.Tx, + height uint32) *bolt.Bucket { + _, _, hghtBucket := ns.getHeightBucketPath(tx, height) + + return hghtBucket +} + +// deleteHeightBucket ensures that the height bucket at the provided index is +// purged from the nursery store. +func (ns *nurseryStore) deleteHeightBucket(tx *bolt.Tx, height uint32) error { + // Ensure that the chain bucket for this nursery store exists. + chainBucket := tx.Bucket(ns.pfxChainKey) + if chainBucket == nil { + return nil + } + + // Ensure that the height index has been properly initialized for this + // chain. + hghtIndex := chainBucket.Bucket(heightIndexKey) + if hghtIndex == nil { + return nil + } + + // Serialize the provided height, as this will form the name of the + // bucket. + var heightBytes [4]byte + byteOrder.PutUint32(heightBytes[:], height) + + // Finally, delete the bucket in question. + err := hghtIndex.DeleteBucket(heightBytes[:]) + if err != nil && err != bolt.ErrBucketNotFound { + return err + } + + return nil } // createHeightChanBucket creates or retrieves an existing height-channel bucket @@ -1156,108 +1064,149 @@ func (ns *nurseryStore) getHeightChanBucket(tx *bolt.Tx, // enumerate crib and kindergarten outputs at a particular height. The callback // is invoked with serialized bytes retrieved for each output of interest, // allowing the caller to deserialize them into the appropriate type. -func (ns *nurseryStore) forEachHeightPrefix(prefix []byte, height uint32, - callback func([]byte) error) error { - - return ns.db.View(func(tx *bolt.Tx) error { - // Start by retrieving the height bucket corresponding to the - // provided block height. - hghtBucket := ns.getHeightBucket(tx, height) - if hghtBucket == nil { - return nil - } - - // Using the height bucket as a starting point, we will traverse - // its entire two-tier directory structure, and filter for - // outputs that have the provided prefix. The first layer of the - // height bucket contains buckets identified by a channel point, - // thus we first create list of channels contained in this - // height bucket. - var channelsAtHeight [][]byte - if err := hghtBucket.ForEach(func(chanBytes, _ []byte) error { - channelsAtHeight = append(channelsAtHeight, chanBytes) - return nil - }); err != nil { - return err - } - - // As we enumerate the outputs referenced in this height bucket, - // we will need to load the serialized value of each output, - // which is ultimately stored its respective channel bucket. To - // do so, we first load the top-level chain bucket, which should - // already be created if we are at this point. - chainBucket := tx.Bucket(ns.pfxChainKey) - if chainBucket == nil { - return nil - } - - // Additionally, grab the chain index, which we will facilitate - // queries for each of the channel buckets of each of the - // channels in the list we assembled above. - chanIndex := chainBucket.Bucket(channelIndexKey) - if chanIndex == nil { - return nil - } - - // Now, we are ready to enumerate all outputs with the desired - // prefix t this block height. We do so by iterating over our - // list of channels at this height, filtering for outputs in - // each height-channel bucket that begin with the given prefix, - // and then retrieving the serialized outputs from the - // appropriate channel bucket. - for _, chanBytes := range channelsAtHeight { - // Retrieve the height-channel bucket for this channel, - // which holds a sub-bucket for all outputs maturing at - // this height. - hghtChanBucket := hghtBucket.Bucket(chanBytes) - if hghtChanBucket == nil { - continue - } - - // Load the appropriate channel bucket from the channel - // index, this will allow us to retrieve the individual - // serialized outputs. - chanBucket := chanIndex.Bucket(chanBytes) - if chanBucket == nil { - continue - } - - // Since all of the outputs of interest will start with - // the same prefix, we will perform a prefix scan of the - // buckets contained in the height-channel bucket, - // efficiently enumerating the desired outputs. - c := hghtChanBucket.Cursor() - - // Seek to and iterate over all entries starting with - // the given prefix. - var pfxOutputKey, _ = c.Seek(prefix) - for bytes.HasPrefix(pfxOutputKey, prefix) { - - // Use the prefix output key emitted from our - // scan to load the serialized babyOutput from - // the appropriate channel bucket. - outputBytes := chanBucket.Get(pfxOutputKey) - if outputBytes == nil { - pfxOutputKey, _ = c.Next() - continue - } - - // Present the serialized bytes to our call back - // function, which is responsible for - // deserializing the bytes into the appropriate - // type. - if err := callback(outputBytes); err != nil { - return err - } - - // Lastly, advance our prefix output key for the - // next iteration. - pfxOutputKey, _ = c.Next() - } - } +func (ns *nurseryStore) forEachHeightPrefix(tx *bolt.Tx, prefix []byte, + height uint32, callback func([]byte) error) error { + // Start by retrieving the height bucket corresponding to the provided + // block height. + chainBucket, _, hghtBucket := ns.getHeightBucketPath(tx, height) + if hghtBucket == nil { return nil - }) + } + + // Using the height bucket as a starting point, we will traverse its + // entire two-tier directory structure, and filter for outputs that have + // the provided prefix. The first layer of the height bucket contains + // buckets identified by a channel point, thus we first create list of + // channels contained in this height bucket. + var channelsAtHeight [][]byte + if err := hghtBucket.ForEach(func(chanBytes, _ []byte) error { + channelsAtHeight = append(channelsAtHeight, chanBytes) + return nil + }); err != nil { + return err + } + + // Additionally, grab the chain index, which we will facilitate queries + // for each of the channel buckets of each of the channels in the list + // we assembled above. + chanIndex := chainBucket.Bucket(channelIndexKey) + if chanIndex == nil { + return errors.New("unable to retrieve channel index") + } + + // Now, we are ready to enumerate all outputs with the desired prefix at + // this block height. We do so by iterating over our list of channels at + // this height, filtering for outputs in each height-channel bucket that + // begin with the given prefix, and then retrieving the serialized + // outputs from the appropriate channel bucket. + for _, chanBytes := range channelsAtHeight { + // Retrieve the height-channel bucket for this channel, which + // holds a sub-bucket for all outputs maturing at this height. + hghtChanBucket := hghtBucket.Bucket(chanBytes) + if hghtChanBucket == nil { + return errors.New("unable to retrieve height-channel bucket") + } + + // Load the appropriate channel bucket from the channel index, + // this will allow us to retrieve the individual serialized + // outputs. + chanBucket := chanIndex.Bucket(chanBytes) + if chanBucket == nil { + return errors.New("unable to retrieve channel bucket") + } + + // Since all of the outputs of interest will start with the same + // prefix, we will perform a prefix scan of the buckets + // contained in the height-channel bucket, efficiently + // enumerating the desired outputs. + c := hghtChanBucket.Cursor() + + // Seek to and iterate over all entries starting with the given + // prefix. + pfxOutputKey, _ := c.Seek(prefix) + for bytes.HasPrefix(pfxOutputKey, prefix) { + + // Use the prefix output key emitted from our scan to + // load the serialized babyOutput from the appropriate + // channel bucket. + outputBytes := chanBucket.Get(pfxOutputKey) + if outputBytes == nil { + return errors.New("unable to retrieve output") + } + + // Present the serialized bytes to our call back + // function, which is responsible for deserializing the + // bytes into the appropriate type. + if err := callback(outputBytes); err != nil { + return err + } + + // Lastly, advance our prefix output key for the next + // iteration. + pfxOutputKey, _ = c.Next() + } + } + + return nil +} + +// forChanOutputs enumerates the outputs contained in a channel bucket to the +// provided callback. The callback accepts a key-value pair of byte slices +// corresponding to the prefixed-output key and the serialized output, +// respectively. +func (ns *nurseryStore) forChanOutputs(tx *bolt.Tx, chanPoint *wire.OutPoint, + callback func([]byte, []byte) error) error { + + chanBucket := ns.getChannelBucket(tx, chanPoint) + if chanBucket == nil { + return ErrContractNotFound + } + + return chanBucket.ForEach(callback) +} + +// getLastFinalizedHeight is a helper method that retrieves the last height for +// which the database finalized its persistent state. +func (ns *nurseryStore) getLastFinalizedHeight(tx *bolt.Tx) (uint32, error) { + // Retrieve the chain bucket associated with the given nursery store. + chainBucket := tx.Bucket(ns.pfxChainKey) + if chainBucket == nil { + return 0, nil + } + + // Lookup the last finalized height in the top-level chain bucket. + heightBytes := chainBucket.Get(lastFinalizedHeightKey) + + // If the resulting bytes are not sized like a uint32, then we have + // never finalized, so we return 0. + if len(heightBytes) != 4 { + return 0, nil + } + + // Otherwise, parse the bytes and return the last finalized height. + return byteOrder.Uint32(heightBytes), nil +} + +// pubLastFinalizedHeight is a helper method that writes the provided height +// under the last finalized height key. +func (ns *nurseryStore) putLastFinalizedHeight(tx *bolt.Tx, + height uint32) error { + + // Ensure that the chain bucket for this nursery store exists. + chainBucket, err := tx.CreateBucketIfNotExists(ns.pfxChainKey) + if err != nil { + return err + } + + // TODO(conner): purge all state below reorg depth. + + // Serialize the provided last-finalized height, and store it in the + // top-level chain bucket for this nursery store. + var lastHeightBytes [4]byte + byteOrder.PutUint32(lastHeightBytes[:], height) + + return chainBucket.Put(lastFinalizedHeightKey, lastHeightBytes[:]) } var ( @@ -1270,206 +1219,90 @@ var ( ErrBucketNotEmpty = errors.New("bucket is not empty, cannot be pruned") ) -// deleteAndPruneHeight removes an output from a channel bucket matching the -// provided channel point. The output is assumed top be in the kindergarten -// bucket, since pruning should never occur for any other type of output. If -// after deletion, the channel bucket is empty, this method will attempt to -// delete the bucket as well. -// NOTE: This method returns two concrete errors apart from those returned by -// the underlying database: ErrBucketDoesNotExist and ErrBucketNotEmpty. These -// should be handled in the context of the caller so as they may be benign -// depending on context. Errors returned other than these two should be -// interpreted as database errors. -func (ns *nurseryStore) deleteAndPruneChannel(tx *bolt.Tx, - chanPoint, outpoint *wire.OutPoint) error { - - // Retrieve the existing chain bucket for this nursery store. - chainBucket := tx.Bucket(ns.pfxChainKey) - if chainBucket == nil { - return nil - } - - // Retrieve the channel index stored in the chain bucket. - chanIndex := chainBucket.Bucket(channelIndexKey) - if chanIndex == nil { - return nil - } - - // Serialize the provided channel point, such that we can retrieve the - // desired channel bucket. - var chanBuffer bytes.Buffer - if err := writeOutpoint(&chanBuffer, chanPoint); err != nil { - return err - } - chanBytes := chanBuffer.Bytes() - - // Retrieve the existing channel bucket. If none exists, then our job is - // complete and it is safe to return nil. - chanBucket := chanIndex.Bucket(chanBytes) - if chanBucket == nil { - return nil - } - - // Otherwise, the bucket still exists. Serialize the outpoint that needs - // deletion, prefixing the key with kindergarten prefix. Since all - // outputs eventually make their way to becoming kindergarten outputs, - // we can safely assume that they will be stored with a kindergarten - // prefix. - pfxOutputBytes, err := prefixOutputKey(kndrPrefix, outpoint) - if err != nil { - return err - } - - // Remove the output in question using the kindergarten-prefixed key we - // generated above. - if err := chanBucket.Delete(pfxOutputBytes); err != nil { - return err - } - - // Finally, now that the outpoint has been removed from this channel - // bucket, try to remove the channel bucket altogether if it is now - // empty. - return ns.removeBucketIfEmpty(chanIndex, chanBytes) -} - -func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error { - return ns.db.Update(func(tx *bolt.Tx) error { - // Retrieve the existing chain bucket for this nursery store. - chainBucket := tx.Bucket(ns.pfxChainKey) - if chainBucket == nil { - return nil - } - - // Retrieve the channel index stored in the chain bucket. - chanIndex := chainBucket.Bucket(channelIndexKey) - if chanIndex == nil { - return nil - } - - // Serialize the provided channel point, such that we can delete - // the mature channel bucket. - var chanBuffer bytes.Buffer - if err := writeOutpoint(&chanBuffer, chanPoint); err != nil { - return err - } - - return chanIndex.DeleteBucket(chanBuffer.Bytes()) - }) -} - -// pruneHeight -// NOTE: This method returns two concrete errors apart from those returned by -// the underlying database: ErrBucketDoesNotExist and ErrBucketNotEmpty. These -// should be handled in the context of the caller so as they may be benign -// depending on context. Errors returned other than these two should be -// interpreted as database errors. -func (ns *nurseryStore) pruneHeight(tx *bolt.Tx, height uint32) error { - - // Fetch the existing chain bucket for this nursery store. - chainBucket := tx.Bucket(ns.pfxChainKey) - if chainBucket == nil { - return nil - } - - // Load the existing height bucket for the height in question. - hghtIndex := chainBucket.Bucket(heightIndexKey) - if hghtIndex == nil { - return nil - } - - // Serialize the provided block height, such that it can be used as the - // key to locate the desired height bucket. - var heightBytes [4]byte - byteOrder.PutUint32(heightBytes[:], height) - - // Retrieve the height bucket using the serialized height as the bucket - // name. - hghtBucket := hghtIndex.Bucket(heightBytes[:]) +// pruneHeight removes the height bucket at the provided height if and only if +// all active outputs at this height have been removed from their respective +// height-channel buckets. +func (ns *nurseryStore) pruneHeight(tx *bolt.Tx, height uint32) (bool, error) { + // Fetch the existing height index and height bucket. + _, hghtIndex, hghtBucket := ns.getHeightBucketPath(tx, height) if hghtBucket == nil { - return ErrBucketDoesNotExist + return false, nil } - // Iterate over the contents of this height bucket, which is comprised - // of sub-buckets named after the channel points that need attention at + // TODO(conner): fix this comment // this block height. We will attempt to remove each one if they are // empty, keeping track of the number of height-channel buckets that // still have active outputs. var nActiveBuckets int if err := hghtBucket.ForEach(func(chanBytes, _ []byte) error { - // Attempt to each height-channel bucket from the height bucket // located above. - err := ns.removeBucketIfEmpty(hghtBucket, chanBytes) - switch err { - case nil: - // The height-channel bucket was removed successfully! - return nil - - case ErrBucketDoesNotExist: - // The height-channel bucket could not be located--no - // harm, no foul. - return nil - - case ErrBucketNotEmpty: - // The bucket still has active outputs at this height, - // increment our number of still active height-channel - // buckets. - nActiveBuckets++ - return nil - - default: - // Database error! + _, err := ns.removeBucketIfEmpty(hghtBucket, chanBytes) + if err != nil && err != ErrBucketNotEmpty { return err + } else if err == ErrBucketNotEmpty { + nActiveBuckets++ } + return nil + }); err != nil { - return err + return false, err } // If we located any height-channel buckets that still have active // outputs, it is unsafe to delete this height bucket. Signal this event // to the caller so that they can determine the appropriate action. if nActiveBuckets > 0 { - return ErrBucketNotEmpty + return false, ErrBucketNotEmpty } + // Serialize the provided block height, such that it can be used as the + // key to delete desired height bucket. + var heightBytes [4]byte + byteOrder.PutUint32(heightBytes[:], height) + // All of the height-channel buckets are empty or have been previously - // removed, proceed by attempting to remove the height bucket + // removed, proceed by removing the height bucket // altogether. - return ns.removeBucketIfEmpty(hghtIndex, heightBytes[:]) + if err := hghtIndex.DeleteBucket(heightBytes[:]); err != nil { + return false, err + } + + return true, nil } // removeBucketIfEmpty attempts to delete a bucket specified by name from the // provided parent bucket. -// NOTE: This method returns two concrete errors apart from those returned by -// the underlying database: ErrBucketDoesNotExist and ErrBucketNotEmpty. These -// should be handled in the context of the caller so as they may be benign -// depending on context. Errors returned other than these two should be -// interpreted as database errors. func (ns *nurseryStore) removeBucketIfEmpty(parent *bolt.Bucket, - bktName []byte) error { + bktName []byte) (bool, error) { // Attempt to fetch the named bucket from its parent. bkt := parent.Bucket(bktName) if bkt == nil { // No bucket was found, signal this to the caller. - return ErrBucketDoesNotExist + return false, nil } // The bucket exists, now compute how many children *it* has. nChildren, err := ns.numChildrenInBucket(bkt) if err != nil { - return err + return false, nil } // If the number of children is non-zero, alert the caller that the // named bucket is not being removed. if nChildren > 0 { - return ErrBucketNotEmpty + return false, nil } // Otherwise, remove the empty bucket from its parent. - return parent.DeleteBucket(bktName) + err = parent.DeleteBucket(bktName) + if err != nil { + return false, err + } + + return true, nil } // numChildrenInBucket computes the number of children contained in the given