From d2d37ce1abe430b73cc3d3fb1becf8a180136123 Mon Sep 17 00:00:00 2001 From: Tadge Dryja Date: Thu, 25 Feb 2016 21:05:01 -0800 Subject: [PATCH] make it way faster there were a lot of dumb things in Ingest() that made it really slow. Like re-hashing the tx a bunch of times. And re-saving it to the db redundantly. also added local bloom filters. Maybe some concurrency issues if you generate an address just as you're getting a tx with that address but that doesn't seem like a real problem. Cheap to rescan anyway. So it's faster and works better. --- uspv/hardmode.go | 62 ++++++++++++++++++++++++++++++++++++++-------- uspv/init.go | 7 ++++++ uspv/msghandler.go | 28 ++++++++++++--------- uspv/txstore.go | 2 ++ uspv/utxodb.go | 34 +++++++++++-------------- 5 files changed, 91 insertions(+), 42 deletions(-) diff --git a/uspv/hardmode.go b/uspv/hardmode.go index b54e8526..445d1029 100644 --- a/uspv/hardmode.go +++ b/uspv/hardmode.go @@ -6,6 +6,8 @@ import ( "log" "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" + "github.com/btcsuite/btcutil/bloom" ) var ( @@ -115,6 +117,27 @@ func calcRoot(hashes []*wire.ShaHash) *wire.ShaHash { return hashes[0] } +func (ts *TxStore) Refilter() error { + allUtxos, err := ts.GetAllUtxos() + if err != nil { + return err + } + filterElements := uint32(len(allUtxos) + len(ts.Adrs)) + + ts.localFilter = bloom.NewFilter(filterElements, 0, 0, wire.BloomUpdateAll) + + for _, u := range allUtxos { + ts.localFilter.AddOutPoint(&u.Op) + } + for _, a := range ts.Adrs { + ts.localFilter.Add(a.PkhAdr.ScriptAddress()) + } + + msg := ts.localFilter.MsgFilterLoad() + fmt.Printf("made %d element filter: %x\n", filterElements, msg.Filter) + return nil +} + // IngestBlock is like IngestMerkleBlock but aralphic // different enough that it's better to have 2 separate functions func (s *SPVCon) IngestBlock(m *wire.MsgBlock) { @@ -148,21 +171,39 @@ func (s *SPVCon) IngestBlock(m *wire.MsgBlock) { return } + fPositive := 0 // local filter false positives + reFilter := 10 // after that many false positives, regenerate filter. + // 10? Making it up. False positives have disk i/o cost, and regenning + // the filter also has costs. With a large local filter, false positives + // should be rare. + // iterate through all txs in the block, looking for matches. - // this is slow and can be sped up by doing in-ram filters client side. - // kindof a pain to implement though and it's fast enough for now. + // use a local bloom filter to ignore txs that don't affect us for i, tx := range m.Transactions { - hits, err := s.TS.Ingest(tx, hah.height) - if err != nil { - log.Printf("Incoming Tx error: %s\n", err.Error()) - return - } - if hits > 0 { - log.Printf("block %d tx %d %s ingested and matches %d utxo/adrs.", - hah.height, i, tx.TxSha().String(), hits) + utilTx := btcutil.NewTx(tx) + if s.TS.localFilter.MatchTxAndUpdate(utilTx) { + hits, err := s.TS.Ingest(tx, hah.height) + if err != nil { + log.Printf("Incoming Tx error: %s\n", err.Error()) + return + } + if hits > 0 { + log.Printf("block %d tx %d %s ingested and matches %d utxo/adrs.", + hah.height, i, tx.TxSha().String(), hits) + } else { + fPositive++ // matched filter but no hits + } } } + if fPositive > reFilter { + fmt.Printf("%d filter false positives in this block\n", fPositive) + err = s.TS.Refilter() + if err != nil { + log.Printf("Refilter error: %s\n", err.Error()) + return + } + } // write to db that we've sync'd to the height indicated in the // merkle block. This isn't QUITE true since we haven't actually gotten // the txs yet but if there are problems with the txs we should backtrack. @@ -171,6 +212,7 @@ func (s *SPVCon) IngestBlock(m *wire.MsgBlock) { log.Printf("full block sync error: %s\n", err.Error()) return } + fmt.Printf("ingested full block %s height %d OK\n", m.Header.BlockSha().String(), hah.height) diff --git a/uspv/init.go b/uspv/init.go index 4fb3844a..0e723daa 100644 --- a/uspv/init.go +++ b/uspv/init.go @@ -91,6 +91,13 @@ func OpenSPV(remoteNode string, hfn, dbfn string, s.inWaitState = make(chan bool, 1) go s.fPositiveHandler() + if hard { + err = s.TS.Refilter() + if err != nil { + return s, err + } + } + return s, nil } diff --git a/uspv/msghandler.go b/uspv/msghandler.go index e8506289..2365f993 100644 --- a/uspv/msghandler.go +++ b/uspv/msghandler.go @@ -5,6 +5,7 @@ import ( "log" "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" ) func (s *SPVCon) incomingMessageHandler() { @@ -157,19 +158,22 @@ func (s *SPVCon) TxHandler(m *wire.MsgTx) { // i, dub.String(), m.TxSha().String()) // } // } - hits, err := s.TS.Ingest(m, height) - if err != nil { - log.Printf("Incoming Tx error: %s\n", err.Error()) - return + utilTx := btcutil.NewTx(m) + if !s.HardMode || s.TS.localFilter.MatchTxAndUpdate(utilTx) { + hits, err := s.TS.Ingest(m, height) + if err != nil { + log.Printf("Incoming Tx error: %s\n", err.Error()) + return + } + if hits == 0 && !s.HardMode { + log.Printf("tx %s had no hits, filter false positive.", + m.TxSha().String()) + s.fPositives <- 1 // add one false positive to chan + return + } + log.Printf("tx %s ingested and matches %d utxo/adrs.", + m.TxSha().String(), hits) } - if hits == 0 && !s.HardMode { - log.Printf("tx %s had no hits, filter false positive.", - m.TxSha().String()) - s.fPositives <- 1 // add one false positive to chan - return - } - log.Printf("tx %s ingested and matches %d utxo/adrs.", - m.TxSha().String(), hits) } // GetDataHandler responds to requests for tx data, which happen after diff --git a/uspv/txstore.go b/uspv/txstore.go index a4b8aadd..acf74f9f 100644 --- a/uspv/txstore.go +++ b/uspv/txstore.go @@ -23,6 +23,8 @@ type TxStore struct { Adrs []MyAdr // endeavouring to acquire capital StateDB *bolt.DB // place to write all this down + localFilter *bloom.Filter // local bloom filter for hard mode + // Params live here, not SCon Param *chaincfg.Params // network parameters (testnet3, testnetL) diff --git a/uspv/utxodb.go b/uspv/utxodb.go index 8fd19489..96f1115a 100644 --- a/uspv/utxodb.go +++ b/uspv/utxodb.go @@ -125,7 +125,7 @@ func (ts *TxStore) NewAdr() (btcutil.Address, error) { ma.KeyIdx = n ts.Adrs = append(ts.Adrs, ma) - + ts.localFilter.Add(ma.PkhAdr.ScriptAddress()) return nAdr, nil } @@ -399,16 +399,15 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx, height int32) (uint32, error) { } } + cachedSha := tx.TxSha() // iterate through all outputs of this tx, see if we gain for i, out := range tx.TxOut { for j, ascr := range aPKscripts { - // detect p2wpkh witBool := false if bytes.Equal(out.PkScript, wPKscripts[j]) { witBool = true } - if bytes.Equal(out.PkScript, ascr) || witBool { // new utxo found var newu Utxo // create new utxo and copy into it newu.AtHeight = height @@ -416,7 +415,7 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx, height int32) (uint32, error) { newu.Value = out.Value newu.IsWit = witBool // copy witness version from pkscript var newop wire.OutPoint - newop.Hash = tx.TxSha() + newop.Hash = cachedSha newop.Index = uint32(i) newu.Op = newop b, err := newu.ToBytes() @@ -459,7 +458,7 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx, height int32) (uint32, error) { var st Stxo // generate spent txo st.Utxo = lostTxo // assign outpoint st.SpendHeight = height // spent at height - st.SpendTxid = tx.TxSha() // spent by txid + st.SpendTxid = cachedSha // spent by txid stxb, err := st.ToBytes() // serialize if err != nil { return err @@ -468,14 +467,7 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx, height int32) (uint32, error) { if err != nil { return err } - // store this relevant tx - sha := tx.TxSha() - var buf bytes.Buffer - tx.SerializeWitness(&buf) // always store witness version - err = txns.Put(sha.Bytes(), buf.Bytes()) - if err != nil { - return err - } + err = duf.Delete(nOP) if err != nil { return err @@ -483,13 +475,6 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx, height int32) (uint32, error) { } } - //delete everything even if it doesn't exist! - // for _, dOP := range spentOPs { - // err = duf.Delete(dOP) - // if err != nil { - // return err - // } - // } // done losing utxos, next gain utxos // next add all new utxos to db, this is quick as the work is above for _, ub := range nUtxoBytes { @@ -498,6 +483,15 @@ func (ts *TxStore) Ingest(tx *wire.MsgTx, height int32) (uint32, error) { return err } } + + // if hits is nonzero it's a relevant tx and we should store it + var buf bytes.Buffer + tx.SerializeWitness(&buf) // always store witness version + err = txns.Put(cachedSha.Bytes(), buf.Bytes()) + if err != nil { + return err + } + return nil }) return hits, err