From 4535b965cd0e2ebaaaaaa20dacd861108b2aa654 Mon Sep 17 00:00:00 2001 From: Tadge Dryja Date: Sat, 6 Feb 2016 22:48:54 -0800 Subject: [PATCH] implement hard mode seems to work OK. Could be sped up by local filters instead of ingesting every tx from the block but I can do that later if performance (mostly disk i/o) is an issue once multiple connections are implemented, hard mode should forward txs to blend in. --- shell.go | 2 +- uspv/eight333.go | 34 ++++++++++++-------- uspv/hardmode.go | 79 +++++++++++++++++++++++++++++++++++----------- uspv/init.go | 6 ++-- uspv/msghandler.go | 22 ++++++------- 5 files changed, 94 insertions(+), 49 deletions(-) diff --git a/shell.go b/shell.go index a9a57cda..551b3364 100644 --- a/shell.go +++ b/shell.go @@ -51,7 +51,7 @@ func shell() { // setup spvCon SCon, err = uspv.OpenSPV( - SPVHostAdr, headerFileName, dbFileName, &Store, Params) + SPVHostAdr, headerFileName, dbFileName, &Store, true, Params) if err != nil { log.Fatal(err) } diff --git a/uspv/eight333.go b/uspv/eight333.go index 5246ae9d..a5001052 100644 --- a/uspv/eight333.go +++ b/uspv/eight333.go @@ -45,7 +45,7 @@ type SPVCon struct { TS *TxStore // transaction store to write to // mBlockQueue is for keeping track of what height we've requested. - mBlockQueue chan HashAndHeight + blockQueue chan HashAndHeight // fPositives is a channel to keep track of bloom filter false positives. fPositives chan int32 @@ -96,6 +96,7 @@ func (s *SPVCon) RemoveHeaders(r int32) error { } func (s *SPVCon) IngestMerkleBlock(m *wire.MsgMerkleBlock) { + txids, err := checkMBlock(m) // check self-consistency if err != nil { log.Printf("Merkle block error: %s\n", err.Error()) @@ -103,7 +104,7 @@ func (s *SPVCon) IngestMerkleBlock(m *wire.MsgMerkleBlock) { } var hah HashAndHeight select { // select here so we don't block on an unrequested mblock - case hah = <-s.mBlockQueue: // pop height off mblock queue + case hah = <-s.blockQueue: // pop height off mblock queue break default: log.Printf("Unrequested merkle block") @@ -282,7 +283,7 @@ func (s *SPVCon) AskForHeaders() error { // AskForMerkBlocks requests blocks from current to last // right now this asks for 1 block per getData message. // Maybe it's faster to ask for many in a each message? -func (s *SPVCon) AskForMerkBlocks() error { +func (s *SPVCon) AskForBlocks() error { var hdr wire.BlockHeader s.headerMutex.Lock() // lock just to check filesize @@ -311,15 +312,16 @@ func (s *SPVCon) AskForMerkBlocks() error { fmt.Printf("will request merkleblocks %d to %d\n", dbTip, headerTip) - // create initial filter - filt, err := s.TS.GimmeFilter() - if err != nil { - return err + if !s.HardMode { // don't send this in hardmode! that's the whole point + // create initial filter + filt, err := s.TS.GimmeFilter() + if err != nil { + return err + } + // send filter + s.SendFilter(filt) + fmt.Printf("sent filter %x\n", filt.MsgFilterLoad().Filter) } - // send filter - s.SendFilter(filt) - fmt.Printf("sent filter %x\n", filt.MsgFilterLoad().Filter) - // loop through all heights where we want merkleblocks. for dbTip <= headerTip { // load header from file @@ -338,7 +340,13 @@ func (s *SPVCon) AskForMerkBlocks() error { bHash := hdr.BlockSha() // create inventory we're asking for - iv1 := wire.NewInvVect(wire.InvTypeFilteredBlock, &bHash) + iv1 := new(wire.InvVect) + // if hardmode, ask for legit blocks, none of this ralphy stuff + if s.HardMode { + iv1 = wire.NewInvVect(wire.InvTypeBlock, &bHash) + } else { // ah well + iv1 = wire.NewInvVect(wire.InvTypeFilteredBlock, &bHash) + } gdataMsg := wire.NewMsgGetData() // add inventory err = gdataMsg.AddInvVect(iv1) @@ -351,7 +359,7 @@ func (s *SPVCon) AskForMerkBlocks() error { } s.outMsgQueue <- gdataMsg // waits here most of the time for the queue to empty out - s.mBlockQueue <- hah // push height and mroot of requested block on queue + s.blockQueue <- hah // push height and mroot of requested block on queue dbTip++ } return nil diff --git a/uspv/hardmode.go b/uspv/hardmode.go index 92d07e12..20d5118a 100644 --- a/uspv/hardmode.go +++ b/uspv/hardmode.go @@ -11,43 +11,84 @@ import ( // BlockRootOK checks that all the txs in the block match the merkle root. // Only checks merkle root; it doesn't look at txs themselves. func BlockRootOK(blk wire.MsgBlock) bool { - fmt.Printf("BlockRootOK for block %s\n", blk.BlockSha().String()) - var shas []*wire.ShaHash for _, tx := range blk.Transactions { // make slice of txids nSha := tx.TxSha() shas = append(shas, &nSha) } - - // pad out tx slice to get the full tree base neededLen := int(nextPowerOfTwo(uint32(len(shas)))) // kindof ugly for len(shas) < neededLen { - shas = append(shas, nil) + shas = append(shas, nil) // pad out tx slice to get the full tree base } - fmt.Printf("Padded %d txs in block to %d\n", - len(blk.Transactions), len(shas)) - - // calculate merkle root. Terse, eh? - for len(shas) > 1 { + for len(shas) > 1 { // calculate merkle root. Terse, eh? shas = append(shas[2:], MakeMerkleParent(shas[0], shas[1])) } - - fmt.Printf("calc'd mroot %s, %s in header\n", - shas[0].String(), blk.Header.MerkleRoot.String()) - - if blk.Header.MerkleRoot.IsEqual(shas[0]) { - return true - } - return false + return blk.Header.MerkleRoot.IsEqual(shas[0]) } +// IngestBlock is like IngestMerkleBlock but aralphic +// different enough that it's better to have 2 separate functions func (s *SPVCon) IngestBlock(m *wire.MsgBlock) { - ok := BlockRootOK(*m) // check self-consistency + var err error + + ok := BlockRootOK(*m) // check block self-consistency if !ok { fmt.Printf("block %s not OK!!11\n", m.BlockSha().String()) return } + var hah HashAndHeight + select { // select here so we don't block on an unrequested mblock + case hah = <-s.blockQueue: // pop height off mblock queue + break + default: + log.Printf("Unrequested full block") + return + } + + newBlockSha := m.Header.BlockSha() + if !hah.blockhash.IsEqual(&newBlockSha) { + log.Printf("full block out of order error") + return + } + + // iterate through all txs in the block, looking for matches. + // this is slow and can be sped up by doing in-ram filters client side. + // kindof a pain to implement though and it's fast enough for now. + for i, tx := range m.Transactions { + hits, err := s.TS.Ingest(tx, hah.height) + if err != nil { + log.Printf("Incoming Tx error: %s\n", err.Error()) + return + } + if hits > 0 { + log.Printf("block %d tx %d %s ingested and matches %d utxo/adrs.", + hah.height, i, tx.TxSha().String(), hits) + } + } + + // write to db that we've sync'd to the height indicated in the + // merkle block. This isn't QUITE true since we haven't actually gotten + // the txs yet but if there are problems with the txs we should backtrack. + err = s.TS.SetDBSyncHeight(hah.height) + if err != nil { + log.Printf("full block sync error: %s\n", err.Error()) + return + } + fmt.Printf("ingested full block %s height %d OK\n", + m.Header.BlockSha().String(), hah.height) + + if hah.final { // check sync end + // don't set waitstate; instead, ask for headers again! + // this way the only thing that triggers waitstate is asking for headers, + // getting 0, calling AskForMerkBlocks(), and seeing you don't need any. + // that way you are pretty sure you're synced up. + err = s.AskForHeaders() + if err != nil { + log.Printf("Merkle block error: %s\n", err.Error()) + return + } + } return } diff --git a/uspv/init.go b/uspv/init.go index 5816307f..59dce7f1 100644 --- a/uspv/init.go +++ b/uspv/init.go @@ -13,10 +13,10 @@ import ( // OpenPV starts a func OpenSPV(remoteNode string, hfn, dbfn string, - inTs *TxStore, p *chaincfg.Params) (SPVCon, error) { + inTs *TxStore, hard bool, p *chaincfg.Params) (SPVCon, error) { // create new SPVCon var s SPVCon - + s.HardMode = hard // I should really merge SPVCon and TxStore, they're basically the same inTs.Param = p s.TS = inTs // copy pointer of txstore into spvcon @@ -83,7 +83,7 @@ func OpenSPV(remoteNode string, hfn, dbfn string, go s.incomingMessageHandler() s.outMsgQueue = make(chan wire.Message) go s.outgoingMessageHandler() - s.mBlockQueue = make(chan HashAndHeight, 32) // queue depth 32 is a thing + s.blockQueue = make(chan HashAndHeight, 32) // queue depth 32 is a thing s.fPositives = make(chan int32, 4000) // a block full, approx s.inWaitState = make(chan bool, 1) go s.fPositiveHandler() diff --git a/uspv/msghandler.go b/uspv/msghandler.go index 1336a02b..90f26689 100644 --- a/uspv/msghandler.go +++ b/uspv/msghandler.go @@ -30,6 +30,8 @@ func (s *SPVCon) incomingMessageHandler() { go s.PongBack(m.Nonce) case *wire.MsgPong: log.Printf("Got a pong response. OK.\n") + case *wire.MsgBlock: + s.IngestBlock(m) case *wire.MsgMerkleBlock: s.IngestMerkleBlock(m) case *wire.MsgHeaders: // concurrent because we keep asking for blocks @@ -48,8 +50,7 @@ func (s *SPVCon) incomingMessageHandler() { } case *wire.MsgGetData: s.GetDataHandler(m) - case *wire.MsgBlock: - s.IngestBlock(m) + default: log.Printf("Got unknown message type %s\n", m.Command()) } @@ -113,15 +114,11 @@ func (s *SPVCon) HeaderHandler(m *wire.MsgHeaders) { } return } - // no moar, done w/ headers, get merkleblocks - if s.HardMode { // in hard mode ask for regular blocks. - - } else { - err = s.AskForMerkBlocks() - if err != nil { - log.Printf("AskForMerkBlocks error: %s", err.Error()) - return - } + // no moar, done w/ headers, get blocks + err = s.AskForBlocks() + if err != nil { + log.Printf("AskForBlocks error: %s", err.Error()) + return } } @@ -153,13 +150,12 @@ func (s *SPVCon) TxHandler(m *wire.MsgTx) { i, dub.String(), m.TxSha().String()) } } - hits, err := s.TS.Ingest(m, height) if err != nil { log.Printf("Incoming Tx error: %s\n", err.Error()) return } - if hits == 0 { + if hits == 0 && !s.HardMode { log.Printf("tx %s had no hits, filter false positive.", m.TxSha().String()) s.fPositives <- 1 // add one false positive to chan