some cleanup, store spent txs

pretty much everything is db based now.  Still some concurrency issues
when multiple block inv messages come in (which they do) where
we used len(mBlockQueue) as a check for being synched up, which doesn't
quite work.  Find a better way to do that...
This commit is contained in:
Tadge Dryja 2016-01-31 15:02:38 -08:00
parent cf01e02d64
commit f231113b90
4 changed files with 55 additions and 108 deletions

View File

@ -76,11 +76,11 @@ func OpenSPV(remoteNode string, hfn, tsfn string,
s.localVersion = VERSION s.localVersion = VERSION
// transaction store for this SPV connection // transaction store for this SPV connection
inTs.Param = p
err = inTs.OpenDB(tsfn) err = inTs.OpenDB(tsfn)
if err != nil { if err != nil {
return s, err return s, err
} }
inTs.Param = p
s.TS = inTs // copy pointer of txstore into spvcon s.TS = inTs // copy pointer of txstore into spvcon
myMsgVer, err := wire.NewMsgVersionFromConn(s.con, 0, 0) myMsgVer, err := wire.NewMsgVersionFromConn(s.con, 0, 0)

View File

@ -114,7 +114,6 @@ func (s *SPVCon) HeaderHandler(m *wire.MsgHeaders) {
if moar { if moar {
s.AskForHeaders() s.AskForHeaders()
} else { // no moar, done w/ headers, get merkleblocks } else { // no moar, done w/ headers, get merkleblocks
fmt.Printf("locks here...?? ")
s.headerMutex.Lock() s.headerMutex.Lock()
endPos, err := s.headerFile.Seek(0, os.SEEK_END) endPos, err := s.headerFile.Seek(0, os.SEEK_END)
if err != nil { if err != nil {
@ -164,7 +163,7 @@ func (s *SPVCon) InvHandler(m *wire.MsgInv) {
s.AskForTx(thing.Hash) s.AskForTx(thing.Hash)
} }
if thing.Type == wire.InvTypeBlock { // new block, ingest if thing.Type == wire.InvTypeBlock { // new block, ingest
if len(s.mBlockQueue) == 0 { if len(s.mBlockQueue) == 0 { // this is not a good check...
// don't ask directly; instead ask for header // don't ask directly; instead ask for header
fmt.Printf("asking for headers due to inv block\n") fmt.Printf("asking for headers due to inv block\n")
s.AskForHeaders() s.AskForHeaders()

View File

@ -60,15 +60,6 @@ func NewTxStore(rootkey *hdkeychain.ExtendedKey) TxStore {
return txs return txs
} }
// add addresses into the TxStore in memory
func (t *TxStore) AddAdr(a btcutil.Address, kidx uint32) {
var ma MyAdr
ma.PkhAdr = a
ma.KeyIdx = kidx
t.Adrs = append(t.Adrs, ma)
return
}
// add txid of interest // add txid of interest
func (t *TxStore) AddTxid(txid *wire.ShaHash, height int32) error { func (t *TxStore) AddTxid(txid *wire.ShaHash, height int32) error {
if txid == nil { if txid == nil {

View File

@ -26,12 +26,13 @@ var (
func (ts *TxStore) OpenDB(filename string) error { func (ts *TxStore) OpenDB(filename string) error {
var err error var err error
var numKeys uint32
ts.StateDB, err = bolt.Open(filename, 0644, nil) ts.StateDB, err = bolt.Open(filename, 0644, nil)
if err != nil { if err != nil {
return err return err
} }
// create buckets if they're not already there // create buckets if they're not already there
return ts.StateDB.Update(func(btx *bolt.Tx) error { err = ts.StateDB.Update(func(btx *bolt.Tx) error {
_, err = btx.CreateBucketIfNotExists(BKTUtxos) _, err = btx.CreateBucketIfNotExists(BKTUtxos)
if err != nil { if err != nil {
return err return err
@ -44,12 +45,37 @@ func (ts *TxStore) OpenDB(filename string) error {
if err != nil { if err != nil {
return err return err
} }
_, err = btx.CreateBucketIfNotExists(BKTState) sta, err := btx.CreateBucketIfNotExists(BKTState)
if err != nil { if err != nil {
return err return err
} }
numKeysBytes := sta.Get(KEYNumKeys)
if numKeysBytes != nil { // NumKeys exists, read into uint32
buf := bytes.NewBuffer(numKeysBytes)
err := binary.Read(buf, binary.BigEndian, &numKeys)
if err != nil {
return err
}
fmt.Printf("db says %d keys\n", numKeys)
} else { // no adrs yet, make it 1 (why...?)
numKeys = 1
var buf bytes.Buffer
err = binary.Write(&buf, binary.BigEndian, numKeys)
if err != nil {
return err
}
err = sta.Put(KEYNumKeys, buf.Bytes())
if err != nil {
return err
}
}
return nil return nil
}) })
if err != nil {
return err
}
return ts.PopulateAdrs(numKeys)
} }
// NewAdr creates a new, never before seen address, and increments the // NewAdr creates a new, never before seen address, and increments the
@ -85,7 +111,11 @@ func (ts *TxStore) NewAdr() (*btcutil.AddressPubKeyHash, error) {
return nil, err return nil, err
} }
// add in to ram. // add in to ram.
ts.AddAdr(newAdr, n) var ma MyAdr
ma.PkhAdr = newAdr
ma.KeyIdx = n
ts.Adrs = append(ts.Adrs, ma)
return newAdr, nil return newAdr, nil
} }
@ -191,8 +221,11 @@ func (ts *TxStore) PopulateAdrs(lastKey uint32) error {
if err != nil { if err != nil {
return err return err
} }
var ma MyAdr
ma.PkhAdr = newAdr
ma.KeyIdx = k
ts.Adrs = append(ts.Adrs, ma)
ts.AddAdr(newAdr, k)
} }
return nil return nil
} }
@ -256,7 +289,7 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx) (uint32, error) {
// get all 4 buckets // get all 4 buckets
duf := btx.Bucket(BKTUtxos) duf := btx.Bucket(BKTUtxos)
// sta := btx.Bucket(BKTState) // sta := btx.Bucket(BKTState)
// old := btx.Bucket(BKTStxos) old := btx.Bucket(BKTStxos)
// txns := btx.Bucket(BKTTxns) // txns := btx.Bucket(BKTTxns)
// first see if we lose utxos // first see if we lose utxos
@ -280,11 +313,25 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx) (uint32, error) {
if err != nil { if err != nil {
return err return err
} }
// after deletion, save stxo to old bucket
var st Stxo // generate spent txo
st.Utxo = lostTxo // assign outpoint
st.SpendHeight = height // spent at height
st.SpendTxid = tx.TxSha() // spent by txid
stxb, err := st.ToBytes() // serialize
if err != nil {
return err
}
err = old.Put(k, stxb) // write k:v outpoint:stxo bytes
if err != nil {
return err
}
return nil // matched utxo k, won't match another return nil // matched utxo k, won't match another
} }
return nil // no match return nil // no match
}) })
} // done losing utxos } // done losing utxos, next gain utxos
// next add all new utxos to db, this is quick as the work is above // next add all new utxos to db, this is quick as the work is above
for _, ub := range nUtxoBytes { for _, ub := range nUtxoBytes {
err = duf.Put(ub[:36], ub[36:]) err = duf.Put(ub[:36], ub[36:])
@ -297,34 +344,6 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx) (uint32, error) {
return hits, err return hits, err
} }
// SaveToDB write a utxo to disk, overwriting an old utxo of the same outpoint
func (ts *TxStore) SaveUtxo(u *Utxo) error {
b, err := u.ToBytes()
if err != nil {
return err
}
err = ts.StateDB.Update(func(btx *bolt.Tx) error {
duf := btx.Bucket(BKTUtxos)
sta := btx.Bucket(BKTState)
// kindof hack, height is 36:40
// also not really tip height...
if u.AtHeight > 0 { // if confirmed
err = sta.Put(KEYTipHeight, b[36:40])
if err != nil {
return err
}
}
// key : val is txid:everything else
return duf.Put(b[:36], b[36:])
})
if err != nil {
return err
}
return nil
}
func (ts *TxStore) MarkSpent(ut Utxo, h int32, stx *wire.MsgTx) error { func (ts *TxStore) MarkSpent(ut Utxo, h int32, stx *wire.MsgTx) error {
// we write in key = outpoint (32 hash, 4 index) // we write in key = outpoint (32 hash, 4 index)
// value = spending txid // value = spending txid
@ -370,68 +389,6 @@ func (ts *TxStore) MarkSpent(ut Utxo, h int32, stx *wire.MsgTx) error {
}) })
} }
// LoadFromDB loads everything in the db file into ram, rebuilding the TxStore
// (except the rootPrivKey, that should be done before calling this --
// this will error if ts.rootPrivKey hasn't been loaded)
func (ts *TxStore) LoadFromDB() error {
if ts.rootPrivKey == nil {
return fmt.Errorf("LoadFromDB needs rootPrivKey loaded")
}
return ts.StateDB.View(func(btx *bolt.Tx) error {
duf := btx.Bucket(BKTUtxos)
if duf == nil {
return fmt.Errorf("no duffel bag")
}
spent := btx.Bucket(BKTStxos)
if spent == nil {
return fmt.Errorf("no spenttx bucket")
}
sta := btx.Bucket(BKTState)
if sta == nil {
return fmt.Errorf("no state bucket")
}
// first populate addresses from state bucket
numKeysBytes := sta.Get(KEYNumKeys)
if numKeysBytes != nil { // NumKeys exists, read into uint32
buf := bytes.NewBuffer(numKeysBytes)
var numKeys uint32
err := binary.Read(buf, binary.BigEndian, &numKeys)
if err != nil {
return err
}
fmt.Printf("db says %d keys\n", numKeys)
err = ts.PopulateAdrs(numKeys)
if err != nil {
return err
}
}
// next load all utxos from db into ram
duf.ForEach(func(k, v []byte) error {
// have to copy k and v here, otherwise append will crash it.
// not quite sure why but append does weird stuff I guess.
stx := spent.Get(k)
if stx == nil { // if it's not in the spent bucket
// create a new utxo
x := make([]byte, len(k)+len(v))
copy(x, k)
copy(x[len(k):], v)
newU, err := UtxoFromBytes(x)
if err != nil {
return err
}
// and add it to ram
ts.Utxos = append(ts.Utxos, &newU)
ts.Sum += newU.Value
} else {
fmt.Printf("had utxo %x but spent by tx %x...\n",
k, stx[:8])
}
return nil
})
return nil
})
}
// outPointToBytes turns an outpoint into 36 bytes. // outPointToBytes turns an outpoint into 36 bytes.
func outPointToBytes(op *wire.OutPoint) ([]byte, error) { func outPointToBytes(op *wire.OutPoint) ([]byte, error) {
var buf bytes.Buffer var buf bytes.Buffer