diff --git a/nursery_store.go b/nursery_store.go index 32aaaaf9..5d694aba 100644 --- a/nursery_store.go +++ b/nursery_store.go @@ -114,11 +114,13 @@ type NurseryStore interface { // transaction. PreschoolToKinder(*kidOutput) error - // GraduateKinder accepts a slice of kidOutputs from the kindergarten - // bucket and marks them graduated. It also removes their corresponding - // entries from the height and channel indexes, pruning any intermediate - // buckets that become empty. - GraduateKinder(height uint32, kndrOutputs []kidOutput) error + // GraduateKinder atomically moves the kindergarten class at the + // provided height into the graduated status. This involves removing the + // kindergarten entries from both the height and channel indexes, and + // cleaning up the finalized kindergarten sweep txn. The height bucket + // will be opportunistically pruned from the height index as outputs are + // removed. + GraduateKinder(height uint32) error // FetchPreschools returns a list of all outputs currently stored in the // preschool bucket. @@ -155,6 +157,9 @@ type NurseryStore interface { // whose type should be inferred from the key's prefix. ForChanOutputs(*wire.OutPoint, func([]byte, []byte) error) error + // ListChannels returns all channels the nursery is currently tracking. + ListChannels() ([]wire.OutPoint, error) + // IsMatureChannel determines the whether or not all of the outputs in a // particular channel bucket have been marked as graduated. IsMatureChannel(*wire.OutPoint) (bool, error) @@ -344,6 +349,7 @@ func (ns *nurseryStore) CribToKinder(bby *babyOutput) error { return err } + // Remove the crib output's entry in the height index. err = ns.removeOutputFromHeight(tx, bby.expiry, chanPoint, pfxOutputKey) if err != nil { @@ -467,74 +473,88 @@ func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput) error { }) } -// GraduateKinder accepts a list of kidOutputs in the kindergarten bucket and -// marks them as graduated. This method also removes their corresponding entries -// from the height and channel indexes corresponding to their kindergarten -// status. It also ensures that the finalized kindergarten sweep txn is removed -// for this height. -func (ns *nurseryStore) GraduateKinder(height uint32, kids []kidOutput) error { - if err := ns.db.Update(func(tx *bolt.Tx) error { - for _, kid := range kids { - - outpoint := kid.OutPoint() - chanPoint := kid.OriginChanPoint() - - // Construct the key under which the output is currently - // stored height and channel indexes. - pfxOutputKey, err := prefixOutputKey(kndrPrefix, - outpoint) - if err != nil { - return err - } - - // Remove the grad output's entry in the height index. - err = ns.removeOutputFromHeight(tx, height, chanPoint, - pfxOutputKey) - if err != nil { - return err - } - - chanBucket := ns.getChannelBucket(tx, chanPoint) - if chanBucket == nil { - return ErrContractNotFound - } - - // Remove previous output with kindergarten prefix. - if err := chanBucket.Delete(pfxOutputKey); err != nil { - return err - } - - // Convert kindergarten key to graduate key. - copy(pfxOutputKey, gradPrefix) - - var gradBuffer bytes.Buffer - if err := kid.Encode(&gradBuffer); err != nil { - return err - } - - // Insert serialized output into channel bucket using - // graduate-prefixed key. - err = chanBucket.Put(pfxOutputKey, gradBuffer.Bytes()) - if err != nil { - return err - } - } - - hghtBucket := ns.getHeightBucket(tx, height) - if hghtBucket == nil { - return nil - } +// GraduateKinder atomically moves the kindergarten class at the provided height +// into the graduated status. This involves removing the kindergarten entries +// from both the height and channel indexes, and cleaning up the finalized +// kindergarten sweep txn. The height bucket will be opportunistically pruned +// from the height index as outputs are removed. +func (ns *nurseryStore) GraduateKinder(height uint32) error { + return ns.db.Update(func(tx *bolt.Tx) error { // Since all kindergarten outputs at a particular height are // swept in a single txn, we can now safely delete the finalized // txn, since it has already been broadcast and confirmed. - return hghtBucket.Delete(finalizedKndrTxnKey) + hghtBucket := ns.getHeightBucket(tx, height) + if hghtBucket == nil { + // Nothing to delete, bucket has already been removed. + return nil + } - }); err != nil { - return err - } + // Remove the finalized kindergarten txn, we do this before + // removing the outputs so that the extra entry doesn't prevent + // the height bucket from being opportunistically pruned below. + if err := hghtBucket.Delete(finalizedKndrTxnKey); err != nil { + return err + } - return nil + // For each kindergarten found output, delete its entry from the + // height and channel index, and create a new grad output in the + // channel index. + return ns.forEachHeightPrefix(tx, kndrPrefix, height, + func(v []byte) error { + var kid kidOutput + err := kid.Decode(bytes.NewReader(v)) + if err != nil { + return err + } + + outpoint := kid.OutPoint() + chanPoint := kid.OriginChanPoint() + + // Construct the key under which the output is + // currently stored height and channel indexes. + pfxOutputKey, err := prefixOutputKey(kndrPrefix, + outpoint) + if err != nil { + return err + } + + // Remove the grad output's entry in the height + // index. + err = ns.removeOutputFromHeight(tx, height, + chanPoint, pfxOutputKey) + if err != nil { + return err + } + + chanBucket := ns.getChannelBucket(tx, + chanPoint) + if chanBucket == nil { + return ErrContractNotFound + } + + // Remove previous output with kindergarten + // prefix. + err = chanBucket.Delete(pfxOutputKey) + if err != nil { + return err + } + + // Convert kindergarten key to graduate key. + copy(pfxOutputKey, gradPrefix) + + var gradBuffer bytes.Buffer + if err := kid.Encode(&gradBuffer); err != nil { + return err + } + + // Insert serialized output into channel bucket + // using graduate-prefixed key. + return chanBucket.Put(pfxOutputKey, + gradBuffer.Bytes()) + }, + ) + }) } // FinalizeKinder accepts a block height as a parameter and purges its @@ -555,7 +575,7 @@ func (ns *nurseryStore) FinalizeKinder(height uint32, // finalized. func (ns *nurseryStore) PurgeHeight(height uint32) error { return ns.db.Update(func(tx *bolt.Tx) error { - if err := ns.deleteHeightBucket(tx, height); err != nil { + if err := ns.purgeHeightBucket(tx, height); err != nil { return err } @@ -601,7 +621,8 @@ func (ns *nurseryStore) FetchClass( return nil - }); err != nil { + }, + ); err != nil { return err } @@ -677,17 +698,14 @@ func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) { // the channel bucket to efficiently enumerate all the // desired outputs. c := chanBucket.Cursor() - - // Seek and iterate over all outputs starting with the - // prefix "pscl". - pfxOutputKey, kidBytes := c.Seek(psclPrefix) - for bytes.HasPrefix(pfxOutputKey, psclPrefix) { + for k, v := c.Seek(psclPrefix); bytes.HasPrefix( + k, psclPrefix); k, v = c.Next() { // Deserialize each output as a kidOutput, since // this should have been the type that was // serialized when it was written to disk. var psclOutput kidOutput - psclReader := bytes.NewReader(kidBytes) + psclReader := bytes.NewReader(v) err := psclOutput.Decode(psclReader) if err != nil { return err @@ -696,8 +714,6 @@ func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) { // Add the deserialized output to our list of // preschool outputs. kids = append(kids, psclOutput) - - pfxOutputKey, kidBytes = c.Next() } } @@ -724,9 +740,39 @@ func (ns *nurseryStore) ForChanOutputs(chanPoint *wire.OutPoint, }) } -// errImmatureChannel signals that not all outputs in a channel bucket have -// graduated. -var errImmatureChannel = errors.New("channel has non-graduated outputs") +// ListChannels returns all channels the nursery is currently tracking. +func (ns *nurseryStore) ListChannels() ([]wire.OutPoint, error) { + var activeChannels []wire.OutPoint + if err := ns.db.View(func(tx *bolt.Tx) error { + // Retrieve the existing chain bucket for this nursery store. + chainBucket := tx.Bucket(ns.pfxChainKey) + if chainBucket == nil { + return nil + } + + // Retrieve the existing channel index. + chanIndex := chainBucket.Bucket(channelIndexKey) + if chanIndex == nil { + return nil + } + + return chanIndex.ForEach(func(chanBytes, _ []byte) error { + var chanPoint wire.OutPoint + err := readOutpoint(bytes.NewReader(chanBytes), &chanPoint) + if err != nil { + return err + } + + activeChannels = append(activeChannels, chanPoint) + + return nil + }) + }); err != nil { + return nil, err + } + + return activeChannels, nil +} // IsMatureChannel determines the whether or not all of the outputs in a // particular channel bucket have been marked as graduated. @@ -738,19 +784,24 @@ func (ns *nurseryStore) IsMatureChannel(chanPoint *wire.OutPoint) (bool, error) return ns.forChanOutputs(tx, chanPoint, func(pfxKey, _ []byte) error { if !bytes.HasPrefix(pfxKey, gradPrefix) { - return errImmatureChannel + return ErrImmatureChannel } return nil }) }) - if err != nil && err != errImmatureChannel { + if err != nil && err != ErrImmatureChannel { return false, err } return err == nil, nil } +// ErrImmatureChannel signals a channel cannot be removed because not all of its +// outputs have graduated. +var ErrImmatureChannel = errors.New("cannot remove immature channel, " + + "still has ungraduated outputs") + // RemoveChannel channel erases all entries from the channel bucket for the // provided channel point. // NOTE: The channel's entries in the height index are assumed to be removed. @@ -778,7 +829,7 @@ func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error { err := ns.forChanOutputs(tx, chanPoint, func(k, v []byte) error { if !bytes.HasPrefix(k, gradPrefix) { - return errors.New("expected grad output") + return ErrImmatureChannel } // Construct a kindergarten prefixed key, since this @@ -1166,7 +1217,8 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bolt.Tx, prefix []byte, // outputs. chanBucket := chanIndex.Bucket(chanBytes) if chanBucket == nil { - return fmt.Errorf("unable to retrieve channel bucket: '%x'", chanBytes) + return fmt.Errorf("unable to retrieve channel "+ + "bucket: '%x'", chanBytes) } // Since all of the outputs of interest will start with the same @@ -1174,16 +1226,13 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bolt.Tx, prefix []byte, // contained in the height-channel bucket, efficiently // enumerating the desired outputs. c := hghtChanBucket.Cursor() - - // Seek to and iterate over all entries starting with the given - // prefix. - pfxOutputKey, _ := c.Seek(prefix) - for bytes.HasPrefix(pfxOutputKey, prefix) { + for k, _ := c.Seek(prefix); bytes.HasPrefix( + k, prefix); k, _ = c.Next() { // Use the prefix output key emitted from our scan to // load the serialized babyOutput from the appropriate // channel bucket. - outputBytes := chanBucket.Get(pfxOutputKey) + outputBytes := chanBucket.Get(k) if outputBytes == nil { return errors.New("unable to retrieve output") } @@ -1194,10 +1243,6 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bolt.Tx, prefix []byte, if err := callback(outputBytes); err != nil { return err } - - // Lastly, advance our prefix output key for the next - // iteration. - pfxOutputKey, _ = c.Next() } } @@ -1230,12 +1275,13 @@ func (ns *nurseryStore) getLastFinalizedHeight(tx *bolt.Tx) (uint32, error) { // Lookup the last finalized height in the top-level chain bucket. heightBytes := chainBucket.Get(lastFinalizedHeightKey) + if heightBytes == nil { + // We have never finalized, return height 0. + return 0, nil + } // If the resulting bytes are not sized like a uint32, then we have // never finalized, so we return 0. - if len(heightBytes) != 4 { - return 0, nil - } // Otherwise, parse the bytes and return the last finalized height. return byteOrder.Uint32(heightBytes), nil @@ -1328,16 +1374,14 @@ func (ns *nurseryStore) getLastPurgedHeight(tx *bolt.Tx) (uint32, error) { return 0, nil } - // Lookup the last finalized height in the top-level chain bucket. + // Lookup the last purged height in the top-level chain bucket. heightBytes := chainBucket.Get(lastPurgedHeightKey) - - // If the resulting bytes are not sized like a uint32, then we have - // never finalized, so we return 0. - if len(heightBytes) != 4 { + if heightBytes == nil { + // We have never purged before, return height 0. return 0, nil } - // Otherwise, parse the bytes and return the last finalized height. + // Otherwise, parse the bytes and return the last purged height. return byteOrder.Uint32(heightBytes), nil } @@ -1351,7 +1395,7 @@ func (ns *nurseryStore) putLastPurgedHeight(tx *bolt.Tx, height uint32) error { return err } - // Serialize the provided last-finalized height, and store it in the + // Serialize the provided last-purged height, and store it in the // top-level chain bucket for this nursery store. var lastHeightBytes [4]byte byteOrder.PutUint32(lastHeightBytes[:], height) @@ -1359,9 +1403,9 @@ func (ns *nurseryStore) putLastPurgedHeight(tx *bolt.Tx, height uint32) error { return chainBucket.Put(lastPurgedHeightKey, lastHeightBytes[:]) } -// ErrBucketNotEmpty signals that an attempt to prune a particular +// errBucketNotEmpty signals that an attempt to prune a particular // bucket failed because it still has active outputs. -var ErrBucketNotEmpty = errors.New("bucket is not empty, cannot be pruned") +var errBucketNotEmpty = errors.New("bucket is not empty, cannot be pruned") // removeOutputFromHeight will delete the given output from the specified // height-channel bucket, and attempt to prune the upstream directories if they @@ -1396,16 +1440,16 @@ func (ns *nurseryStore) removeOutputFromHeight(tx *bolt.Tx, height uint32, // Try to remove the channel-height bucket if it this was the last // output in the bucket. err := removeBucketIfEmpty(hghtBucket, chanBuffer.Bytes()) - if err != nil && err != ErrBucketNotEmpty { + if err != nil && err != errBucketNotEmpty { return err - } else if err == ErrBucketNotEmpty { + } else if err == errBucketNotEmpty { return nil } // Attempt to prune the height bucket matching the kid output's // confirmation height in case that was the last height-chan bucket. pruned, err := ns.pruneHeight(tx, height) - if err != nil && err != ErrBucketNotEmpty { + if err != nil && err != errBucketNotEmpty { return err } else if err == nil && pruned { utxnLog.Infof("Height bucket %d pruned", height) @@ -1493,11 +1537,11 @@ func removeBucketIfExists(parent *bolt.Bucket, bktName []byte) error { return parent.DeleteBucket(bktName) } -// isBucketEmpty returns ErrBucketNotEmpty if the bucket has a non-zero number +// isBucketEmpty returns errBucketNotEmpty if the bucket has a non-zero number // of children. func isBucketEmpty(parent *bolt.Bucket) error { return parent.ForEach(func(_, _ []byte) error { - return ErrBucketNotEmpty + return errBucketNotEmpty }) }