From d9afd623eb746c1dedcbf80c20bd7be7e1514ff4 Mon Sep 17 00:00:00 2001 From: Tadge Dryja Date: Wed, 27 Jan 2016 01:24:16 -0800 Subject: [PATCH] new stxo struct and more db methods I'm getting away from having both in-ram and on-disk stores for the transaction store data. it should all be on disk, it's safer that way. It might be slower but this will not process many txs / second anyway. --- uspv/eight333.go | 26 ++++++- uspv/msghandler.go | 31 +++++--- uspv/txstore.go | 30 ++++++-- uspv/utxodb.go | 187 ++++++++++++++++++++++++++++++++++++++------- 4 files changed, 225 insertions(+), 49 deletions(-) diff --git a/uspv/eight333.go b/uspv/eight333.go index 548f450a..a8e592d5 100644 --- a/uspv/eight333.go +++ b/uspv/eight333.go @@ -222,10 +222,19 @@ func (s *SPVCon) AskForTx(txid wire.ShaHash) { // We don't have it in our header file so when we get it we do both operations: // appending and checking the header, and checking spv proofs func (s *SPVCon) AskForBlock(hsh wire.ShaHash) { + + fmt.Printf("mBlockQueue len %d\n", len(s.mBlockQueue)) + // wait until all mblocks are done before adding + for len(s.mBlockQueue) != 0 { + // fmt.Printf("mBlockQueue len %d\n", len(s.mBlockQueue)) + } + gdata := wire.NewMsgGetData() inv := wire.NewInvVect(wire.InvTypeFilteredBlock, &hsh) gdata.AddInvVect(inv) + // TODO - wait until headers are sync'd before checking height + info, err := s.headerFile.Stat() // get if err != nil { log.Fatal(err) // crash if header file disappears @@ -437,7 +446,11 @@ func (s *SPVCon) AskForMerkBlocks(current, last int32) error { } fmt.Printf("will request merkleblocks %d to %d\n", current, last) // track number of utxos - track := len(s.TS.Utxos) + track, err := s.TS.NumUtxos() + if err != nil { + return err + } + // create initial filter filt, err := s.TS.GimmeFilter() if err != nil { @@ -454,15 +467,20 @@ func (s *SPVCon) AskForMerkBlocks(current, last int32) error { // loop through all heights where we want merkleblocks. for current < last { // check if we need to update filter... diff of 5 utxos...? - if track < len(s.TS.Utxos)-4 || track > len(s.TS.Utxos)+4 { - track = len(s.TS.Utxos) + nTrack, err := s.TS.NumUtxos() + if err != nil { + return err + } + + if track < nTrack-4 || track > nTrack+4 { + track = nTrack filt, err := s.TS.GimmeFilter() if err != nil { return err } s.SendFilter(filt) - fmt.Printf("sent filter %x\n", filt.MsgFilterLoad().Filter) + fmt.Printf("sent %d byte filter\n", len(filt.MsgFilterLoad().Filter)) } // load header from file diff --git a/uspv/msghandler.go b/uspv/msghandler.go index 58d15093..a73e872b 100644 --- a/uspv/msghandler.go +++ b/uspv/msghandler.go @@ -54,18 +54,14 @@ func (s *SPVCon) incomingMessageHandler() { log.Printf("Rejected! cmd: %s code: %s tx: %s reason: %s", m.Cmd, m.Code.String(), m.Hash.String(), m.Reason) case *wire.MsgInv: - log.Printf("got inv. Contains:\n") + go s.InvHandler(m) + + case *wire.MsgNotFound: + log.Printf("Got not found response from remote:") for i, thing := range m.InvList { - log.Printf("\t%d)%s : %s", - i, thing.Type.String(), thing.Hash.String()) - if thing.Type == wire.InvTypeTx { // new tx, ingest - s.TS.OKTxids[thing.Hash] = 0 // unconfirmed - s.AskForTx(thing.Hash) - } - if thing.Type == wire.InvTypeBlock { // new block, ingest - s.AskForBlock(thing.Hash) - } + log.Printf("\t$d) %s: %s", i, thing.Type, thing.Hash) } + default: log.Printf("Got unknown message type %s\n", m.Command()) } @@ -86,3 +82,18 @@ func (s *SPVCon) outgoingMessageHandler() { } return } + +func (s *SPVCon) InvHandler(m *wire.MsgInv) { + log.Printf("got inv. Contains:\n") + for i, thing := range m.InvList { + log.Printf("\t%d)%s : %s", + i, thing.Type.String(), thing.Hash.String()) + if thing.Type == wire.InvTypeTx { // new tx, ingest + s.TS.OKTxids[thing.Hash] = 0 // unconfirmed + s.AskForTx(thing.Hash) + } + if thing.Type == wire.InvTypeBlock { // new block, ingest + s.AskForBlock(thing.Hash) + } + } +} diff --git a/uspv/txstore.go b/uspv/txstore.go index afe3561e..c768567a 100644 --- a/uspv/txstore.go +++ b/uspv/txstore.go @@ -32,12 +32,21 @@ type TxStore struct { } type Utxo struct { // cash money. + Op wire.OutPoint // where + // all the info needed to spend AtHeight int32 // block height where this tx was confirmed, 0 for unconf KeyIdx uint32 // index for private key needed to sign / spend Value int64 // higher is better - Op wire.OutPoint // where + // IsCoinbase bool // can't spend for a while +} + +// Stxo is a utxo that has moved on. +type Stxo struct { + Utxo // when it used to be a utxo + SpendHeight int32 // height at which it met its demise + SpendTxid wire.ShaHash // the tx that consumed it } type MyAdr struct { // an address I have the private key for @@ -79,7 +88,12 @@ func (t *TxStore) GimmeFilter() (*bloom.Filter, error) { return nil, fmt.Errorf("no addresses to filter for") } // add addresses to look for incoming - elem := uint32(len(t.Adrs) + len(t.Utxos)) + nutxo, err := t.NumUtxos() + if err != nil { + return nil, err + } + + elem := uint32(len(t.Adrs)) + nutxo f := bloom.NewFilter(elem, 0, 0.001, wire.BloomUpdateAll) for _, a := range t.Adrs { f.Add(a.PkhAdr.ScriptAddress()) @@ -121,7 +135,7 @@ func (t *TxStore) AbsorbTx(tx *wire.MsgTx, height int32) error { newTxid := tx.TxSha() var hits uint32 // how many outputs of this tx are ours var acq int64 // total acquirement from this tx - // check if any of the tx's outputs match my adrs + // check if any of the tx's outputs match my known outpoints for i, out := range tx.TxOut { // in each output of tx dup := false // start by assuming its new until found duplicate newOp := wire.NewOutPoint(&newTxid, uint32(i)) @@ -133,7 +147,7 @@ func (t *TxStore) AbsorbTx(tx *wire.MsgTx, height int32) error { fmt.Printf(" %s is dupe\t", newOp.String()) u.AtHeight = height // ONLY difference is height // save modified utxo to db, overwriting old one - err := u.SaveToDB(t.StateDB) + err := t.SaveUtxo(u) if err != nil { return err } @@ -145,15 +159,15 @@ func (t *TxStore) AbsorbTx(tx *wire.MsgTx, height int32) error { // when it matches an address, just go to the next outpoint continue } + // check if this is a new txout matching one of my addresses for _, a := range t.Adrs { // compare to each adr we have // check for full script to eliminate false positives aPKscript, err := txscript.PayToAddrScript(a.PkhAdr) if err != nil { return err } - // already checked for dupes, this must be a new outpoint if bytes.Equal(out.PkScript, aPKscript) { // hit - + // already checked for dupes, so this must be a new outpoint var newu Utxo newu.AtHeight = height newu.KeyIdx = a.KeyIdx @@ -163,7 +177,7 @@ func (t *TxStore) AbsorbTx(tx *wire.MsgTx, height int32) error { newop.Hash = tx.TxSha() newop.Index = uint32(i) newu.Op = newop - err = newu.SaveToDB(t.StateDB) + err = t.SaveUtxo(&newu) if err != nil { return err } @@ -193,7 +207,7 @@ func (t *TxStore) ExpellTx(tx *wire.MsgTx, height int32) error { if OutPointsEqual(myutxo.Op, in.PreviousOutPoint) { hits++ loss += myutxo.Value - err := t.MarkSpent(&myutxo.Op, height, tx) + err := t.MarkSpent(*myutxo, height, tx) if err != nil { return err } diff --git a/uspv/utxodb.go b/uspv/utxodb.go index aee9d9ff..d6d5763b 100644 --- a/uspv/utxodb.go +++ b/uspv/utxodb.go @@ -14,7 +14,8 @@ import ( var ( BKTUtxos = []byte("DuffelBag") // leave the rest to collect interest - BKTOld = []byte("SpentTxs") // for bookkeeping + BKTStxos = []byte("SpentTxs") // for bookkeeping + BKTTxns = []byte("Txns") // all txs we care about, for replays BKTState = []byte("MiscState") // last state of DB KEYNumKeys = []byte("NumKeys") // number of keys used @@ -27,16 +28,20 @@ func (ts *TxStore) OpenDB(filename string) error { return err } // create buckets if they're not already there - return ts.StateDB.Update(func(tx *bolt.Tx) error { - _, err = tx.CreateBucketIfNotExists(BKTUtxos) + return ts.StateDB.Update(func(btx *bolt.Tx) error { + _, err = btx.CreateBucketIfNotExists(BKTUtxos) if err != nil { return err } - _, err = tx.CreateBucketIfNotExists(BKTOld) + _, err = btx.CreateBucketIfNotExists(BKTStxos) if err != nil { return err } - _, err = tx.CreateBucketIfNotExists(BKTState) + _, err = btx.CreateBucketIfNotExists(BKTTxns) + if err != nil { + return err + } + _, err = btx.CreateBucketIfNotExists(BKTState) if err != nil { return err } @@ -69,8 +74,8 @@ func (ts *TxStore) NewAdr() (*btcutil.AddressPubKeyHash, error) { } // write to db file - err = ts.StateDB.Update(func(tx *bolt.Tx) error { - stt := tx.Bucket(BKTState) + err = ts.StateDB.Update(func(btx *bolt.Tx) error { + stt := btx.Bucket(BKTState) return stt.Put(KEYNumKeys, buf.Bytes()) }) if err != nil { @@ -81,6 +86,24 @@ func (ts *TxStore) NewAdr() (*btcutil.AddressPubKeyHash, error) { return newAdr, nil } +// NumUtxos returns the number of utxos in the DB. +func (ts *TxStore) NumUtxos() (uint32, error) { + var n uint32 + err := ts.StateDB.View(func(btx *bolt.Tx) error { + duf := btx.Bucket(BKTUtxos) + if duf == nil { + return fmt.Errorf("no duffel bag") + } + stats := duf.Stats() + n = uint32(stats.KeyN) + return nil + }) + if err != nil { + return 0, err + } + return n, nil +} + // PopulateAdrs just puts a bunch of adrs in ram; it doesn't touch the DB func (ts *TxStore) PopulateAdrs(lastKey uint32) error { for k := uint32(0); k < lastKey; k++ { @@ -101,10 +124,9 @@ func (ts *TxStore) PopulateAdrs(lastKey uint32) error { } // SaveToDB write a utxo to disk, overwriting an old utxo of the same outpoint -func (u *Utxo) SaveToDB(dbx *bolt.DB) error { - - err := dbx.Update(func(tx *bolt.Tx) error { - duf := tx.Bucket(BKTUtxos) +func (ts *TxStore) SaveUtxo(u *Utxo) error { + err := ts.StateDB.Update(func(btx *bolt.Tx) error { + duf := btx.Bucket(BKTUtxos) b, err := u.ToBytes() if err != nil { return err @@ -124,26 +146,47 @@ func (u *Utxo) SaveToDB(dbx *bolt.DB) error { return nil } -func (ts *TxStore) MarkSpent(op *wire.OutPoint, h int32, stx *wire.MsgTx) error { +func (ts *TxStore) MarkSpent(ut Utxo, h int32, stx *wire.MsgTx) error { // we write in key = outpoint (32 hash, 4 index) // value = spending txid // if we care about the spending tx we can store that in another bucket. - return ts.StateDB.Update(func(tx *bolt.Tx) error { - old := tx.Bucket(BKTOld) - opb, err := outPointToBytes(op) + + var st Stxo + st.Utxo = ut + st.SpendHeight = h + st.SpendTxid = stx.TxSha() + + return ts.StateDB.Update(func(btx *bolt.Tx) error { + duf := btx.Bucket(BKTUtxos) + old := btx.Bucket(BKTStxos) + txns := btx.Bucket(BKTTxns) + + opb, err := outPointToBytes(&st.Op) if err != nil { return err } - var buf bytes.Buffer - err = binary.Write(&buf, binary.BigEndian, h) + + err = duf.Delete(opb) // not utxo anymore if err != nil { return err } + + stxb, err := st.ToBytes() + if err != nil { + return err + } + + err = old.Put(opb, stxb) // write k:v outpoint:stxo bytes + if err != nil { + return err + } + + // store spending tx sha := stx.TxSha() - err = old.Put(opb, sha.Bytes()) // write k:v outpoint:txid - if err != nil { - return err - } + var buf bytes.Buffer + stx.Serialize(&buf) + txns.Put(sha.Bytes(), buf.Bytes()) + return nil }) } @@ -155,16 +198,16 @@ func (ts *TxStore) LoadFromDB() error { if ts.rootPrivKey == nil { return fmt.Errorf("LoadFromDB needs rootPrivKey loaded") } - return ts.StateDB.View(func(tx *bolt.Tx) error { - duf := tx.Bucket(BKTUtxos) + return ts.StateDB.View(func(btx *bolt.Tx) error { + duf := btx.Bucket(BKTUtxos) if duf == nil { return fmt.Errorf("no duffel bag") } - spent := tx.Bucket(BKTOld) + spent := btx.Bucket(BKTStxos) if spent == nil { return fmt.Errorf("no spenttx bucket") } - state := tx.Bucket(BKTState) + state := btx.Bucket(BKTState) if state == nil { return fmt.Errorf("no state bucket") } @@ -268,8 +311,8 @@ func UtxoFromBytes(b []byte) (Utxo, error) { return u, fmt.Errorf("nil input slice") } buf := bytes.NewBuffer(b) - if buf.Len() < 52 { // minimum 52 bytes with no pkscript - return u, fmt.Errorf("Got %d bytes for sender, expect > 52", buf.Len()) + if buf.Len() < 52 { // utxos are 52 bytes + return u, fmt.Errorf("Got %d bytes for utxo, expect 52", buf.Len()) } // read 32 byte txid err := u.Op.Hash.SetBytes(buf.Next(32)) @@ -298,3 +341,93 @@ func UtxoFromBytes(b []byte) (Utxo, error) { } return u, nil } + +// ToBytes turns an Stxo into some bytes. +// outpoint txid, outpoint idx, height, key idx, amt, spendheight, spendtxid +func (s *Stxo) ToBytes() ([]byte, error) { + var buf bytes.Buffer + // write 32 byte txid of the utxo + _, err := buf.Write(s.Op.Hash.Bytes()) + if err != nil { + return nil, err + } + // write 4 byte outpoint index within the tx to spend + err = binary.Write(&buf, binary.BigEndian, s.Op.Index) + if err != nil { + return nil, err + } + // write 4 byte height of utxo + err = binary.Write(&buf, binary.BigEndian, s.AtHeight) + if err != nil { + return nil, err + } + // write 4 byte key index of utxo + err = binary.Write(&buf, binary.BigEndian, s.KeyIdx) + if err != nil { + return nil, err + } + // write 8 byte amount of money at the utxo + err = binary.Write(&buf, binary.BigEndian, s.Value) + if err != nil { + return nil, err + } + // write 4 byte height where the txo was spent + err = binary.Write(&buf, binary.BigEndian, s.SpendHeight) + if err != nil { + return nil, err + } + // write 32 byte txid of the spending transaction + _, err = buf.Write(s.SpendTxid.Bytes()) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +// StxoFromBytes turns bytes into a Stxo. +func StxoFromBytes(b []byte) (Stxo, error) { + var s Stxo + if b == nil { + return s, fmt.Errorf("nil input slice") + } + buf := bytes.NewBuffer(b) + if buf.Len() < 88 { // stxos are 88 bytes + return s, fmt.Errorf("Got %d bytes for stxo, expect 88", buf.Len()) + } + // read 32 byte txid + err := s.Op.Hash.SetBytes(buf.Next(32)) + if err != nil { + return s, err + } + // read 4 byte outpoint index within the tx to spend + err = binary.Read(buf, binary.BigEndian, &s.Op.Index) + if err != nil { + return s, err + } + // read 4 byte height of utxo + err = binary.Read(buf, binary.BigEndian, &s.AtHeight) + if err != nil { + return s, err + } + // read 4 byte key index of utxo + err = binary.Read(buf, binary.BigEndian, &s.KeyIdx) + if err != nil { + return s, err + } + // read 8 byte amount of money at the utxo + err = binary.Read(buf, binary.BigEndian, &s.Value) + if err != nil { + return s, err + } + // read 4 byte spend height + err = binary.Read(buf, binary.BigEndian, &s.SpendHeight) + if err != nil { + return s, err + } + // read 32 byte txid + err = s.SpendTxid.SetBytes(buf.Next(32)) + if err != nil { + return s, err + } + return s, nil +}