From f231113b9076b0ba4eb823ccc0eb79c80bac58df Mon Sep 17 00:00:00 2001 From: Tadge Dryja Date: Sun, 31 Jan 2016 15:02:38 -0800 Subject: [PATCH] some cleanup, store spent txs pretty much everything is db based now. Still some concurrency issues when multiple block inv messages come in (which they do) where we used len(mBlockQueue) as a check for being synched up, which doesn't quite work. Find a better way to do that... --- uspv/eight333.go | 2 +- uspv/msghandler.go | 3 +- uspv/txstore.go | 9 --- uspv/utxodb.go | 149 ++++++++++++++++----------------------------- 4 files changed, 55 insertions(+), 108 deletions(-) diff --git a/uspv/eight333.go b/uspv/eight333.go index 49a8a176..0d9bb007 100644 --- a/uspv/eight333.go +++ b/uspv/eight333.go @@ -76,11 +76,11 @@ func OpenSPV(remoteNode string, hfn, tsfn string, s.localVersion = VERSION // transaction store for this SPV connection + inTs.Param = p err = inTs.OpenDB(tsfn) if err != nil { return s, err } - inTs.Param = p s.TS = inTs // copy pointer of txstore into spvcon myMsgVer, err := wire.NewMsgVersionFromConn(s.con, 0, 0) diff --git a/uspv/msghandler.go b/uspv/msghandler.go index 3e1ffef9..a6a8e0cc 100644 --- a/uspv/msghandler.go +++ b/uspv/msghandler.go @@ -114,7 +114,6 @@ func (s *SPVCon) HeaderHandler(m *wire.MsgHeaders) { if moar { s.AskForHeaders() } else { // no moar, done w/ headers, get merkleblocks - fmt.Printf("locks here...?? ") s.headerMutex.Lock() endPos, err := s.headerFile.Seek(0, os.SEEK_END) if err != nil { @@ -164,7 +163,7 @@ func (s *SPVCon) InvHandler(m *wire.MsgInv) { s.AskForTx(thing.Hash) } if thing.Type == wire.InvTypeBlock { // new block, ingest - if len(s.mBlockQueue) == 0 { + if len(s.mBlockQueue) == 0 { // this is not a good check... // don't ask directly; instead ask for header fmt.Printf("asking for headers due to inv block\n") s.AskForHeaders() diff --git a/uspv/txstore.go b/uspv/txstore.go index 8eab9947..15b6d562 100644 --- a/uspv/txstore.go +++ b/uspv/txstore.go @@ -60,15 +60,6 @@ func NewTxStore(rootkey *hdkeychain.ExtendedKey) TxStore { return txs } -// add addresses into the TxStore in memory -func (t *TxStore) AddAdr(a btcutil.Address, kidx uint32) { - var ma MyAdr - ma.PkhAdr = a - ma.KeyIdx = kidx - t.Adrs = append(t.Adrs, ma) - return -} - // add txid of interest func (t *TxStore) AddTxid(txid *wire.ShaHash, height int32) error { if txid == nil { diff --git a/uspv/utxodb.go b/uspv/utxodb.go index 34e48527..25e0bad8 100644 --- a/uspv/utxodb.go +++ b/uspv/utxodb.go @@ -26,12 +26,13 @@ var ( func (ts *TxStore) OpenDB(filename string) error { var err error + var numKeys uint32 ts.StateDB, err = bolt.Open(filename, 0644, nil) if err != nil { return err } // create buckets if they're not already there - return ts.StateDB.Update(func(btx *bolt.Tx) error { + err = ts.StateDB.Update(func(btx *bolt.Tx) error { _, err = btx.CreateBucketIfNotExists(BKTUtxos) if err != nil { return err @@ -44,12 +45,37 @@ func (ts *TxStore) OpenDB(filename string) error { if err != nil { return err } - _, err = btx.CreateBucketIfNotExists(BKTState) + sta, err := btx.CreateBucketIfNotExists(BKTState) if err != nil { return err } + + numKeysBytes := sta.Get(KEYNumKeys) + if numKeysBytes != nil { // NumKeys exists, read into uint32 + buf := bytes.NewBuffer(numKeysBytes) + err := binary.Read(buf, binary.BigEndian, &numKeys) + if err != nil { + return err + } + fmt.Printf("db says %d keys\n", numKeys) + } else { // no adrs yet, make it 1 (why...?) + numKeys = 1 + var buf bytes.Buffer + err = binary.Write(&buf, binary.BigEndian, numKeys) + if err != nil { + return err + } + err = sta.Put(KEYNumKeys, buf.Bytes()) + if err != nil { + return err + } + } return nil }) + if err != nil { + return err + } + return ts.PopulateAdrs(numKeys) } // NewAdr creates a new, never before seen address, and increments the @@ -85,7 +111,11 @@ func (ts *TxStore) NewAdr() (*btcutil.AddressPubKeyHash, error) { return nil, err } // add in to ram. - ts.AddAdr(newAdr, n) + var ma MyAdr + ma.PkhAdr = newAdr + ma.KeyIdx = n + ts.Adrs = append(ts.Adrs, ma) + return newAdr, nil } @@ -191,8 +221,11 @@ func (ts *TxStore) PopulateAdrs(lastKey uint32) error { if err != nil { return err } + var ma MyAdr + ma.PkhAdr = newAdr + ma.KeyIdx = k + ts.Adrs = append(ts.Adrs, ma) - ts.AddAdr(newAdr, k) } return nil } @@ -256,7 +289,7 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx) (uint32, error) { // get all 4 buckets duf := btx.Bucket(BKTUtxos) // sta := btx.Bucket(BKTState) - // old := btx.Bucket(BKTStxos) + old := btx.Bucket(BKTStxos) // txns := btx.Bucket(BKTTxns) // first see if we lose utxos @@ -280,11 +313,25 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx) (uint32, error) { if err != nil { return err } + // after deletion, save stxo to old bucket + var st Stxo // generate spent txo + st.Utxo = lostTxo // assign outpoint + st.SpendHeight = height // spent at height + st.SpendTxid = tx.TxSha() // spent by txid + stxb, err := st.ToBytes() // serialize + if err != nil { + return err + } + err = old.Put(k, stxb) // write k:v outpoint:stxo bytes + if err != nil { + return err + } + return nil // matched utxo k, won't match another } return nil // no match }) - } // done losing utxos + } // done losing utxos, next gain utxos // next add all new utxos to db, this is quick as the work is above for _, ub := range nUtxoBytes { err = duf.Put(ub[:36], ub[36:]) @@ -297,34 +344,6 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx) (uint32, error) { return hits, err } -// SaveToDB write a utxo to disk, overwriting an old utxo of the same outpoint -func (ts *TxStore) SaveUtxo(u *Utxo) error { - b, err := u.ToBytes() - if err != nil { - return err - } - - err = ts.StateDB.Update(func(btx *bolt.Tx) error { - duf := btx.Bucket(BKTUtxos) - sta := btx.Bucket(BKTState) - // kindof hack, height is 36:40 - // also not really tip height... - if u.AtHeight > 0 { // if confirmed - err = sta.Put(KEYTipHeight, b[36:40]) - if err != nil { - return err - } - } - - // key : val is txid:everything else - return duf.Put(b[:36], b[36:]) - }) - if err != nil { - return err - } - return nil -} - func (ts *TxStore) MarkSpent(ut Utxo, h int32, stx *wire.MsgTx) error { // we write in key = outpoint (32 hash, 4 index) // value = spending txid @@ -370,68 +389,6 @@ func (ts *TxStore) MarkSpent(ut Utxo, h int32, stx *wire.MsgTx) error { }) } -// LoadFromDB loads everything in the db file into ram, rebuilding the TxStore -// (except the rootPrivKey, that should be done before calling this -- -// this will error if ts.rootPrivKey hasn't been loaded) -func (ts *TxStore) LoadFromDB() error { - if ts.rootPrivKey == nil { - return fmt.Errorf("LoadFromDB needs rootPrivKey loaded") - } - return ts.StateDB.View(func(btx *bolt.Tx) error { - duf := btx.Bucket(BKTUtxos) - if duf == nil { - return fmt.Errorf("no duffel bag") - } - spent := btx.Bucket(BKTStxos) - if spent == nil { - return fmt.Errorf("no spenttx bucket") - } - sta := btx.Bucket(BKTState) - if sta == nil { - return fmt.Errorf("no state bucket") - } - // first populate addresses from state bucket - numKeysBytes := sta.Get(KEYNumKeys) - if numKeysBytes != nil { // NumKeys exists, read into uint32 - buf := bytes.NewBuffer(numKeysBytes) - var numKeys uint32 - err := binary.Read(buf, binary.BigEndian, &numKeys) - if err != nil { - return err - } - fmt.Printf("db says %d keys\n", numKeys) - err = ts.PopulateAdrs(numKeys) - if err != nil { - return err - } - } - // next load all utxos from db into ram - duf.ForEach(func(k, v []byte) error { - // have to copy k and v here, otherwise append will crash it. - // not quite sure why but append does weird stuff I guess. - stx := spent.Get(k) - if stx == nil { // if it's not in the spent bucket - // create a new utxo - x := make([]byte, len(k)+len(v)) - copy(x, k) - copy(x[len(k):], v) - newU, err := UtxoFromBytes(x) - if err != nil { - return err - } - // and add it to ram - ts.Utxos = append(ts.Utxos, &newU) - ts.Sum += newU.Value - } else { - fmt.Printf("had utxo %x but spent by tx %x...\n", - k, stx[:8]) - } - return nil - }) - return nil - }) -} - // outPointToBytes turns an outpoint into 36 bytes. func outPointToBytes(op *wire.OutPoint) ([]byte, error) { var buf bytes.Buffer