mirror of https://github.com/poanetwork/quorum.git
Merge pull request #830 from obscuren/downloader-missing-parent
eth/downloader: missing parent improvement
This commit is contained in:
commit
3fef601903
|
@ -47,7 +47,7 @@ import _ "net/http/pprof"
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ClientIdentifier = "Geth"
|
ClientIdentifier = "Geth"
|
||||||
Version = "0.9.13"
|
Version = "0.9.14"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -172,7 +172,7 @@ func ImportChain(chainmgr *core.ChainManager, fn string) error {
|
||||||
n++
|
n++
|
||||||
|
|
||||||
if n == batchSize {
|
if n == batchSize {
|
||||||
if err := chainmgr.InsertChain(blocks); err != nil {
|
if _, err := chainmgr.InsertChain(blocks); err != nil {
|
||||||
return fmt.Errorf("invalid block %v", err)
|
return fmt.Errorf("invalid block %v", err)
|
||||||
}
|
}
|
||||||
n = 0
|
n = 0
|
||||||
|
@ -181,7 +181,7 @@ func ImportChain(chainmgr *core.ChainManager, fn string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
if err := chainmgr.InsertChain(blocks[:n]); err != nil {
|
if _, err := chainmgr.InsertChain(blocks[:n]); err != nil {
|
||||||
return fmt.Errorf("invalid block %v", err)
|
return fmt.Errorf("invalid block %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -141,6 +141,6 @@ func newCanonical(n int, db common.Database) (*BlockProcessor, error) {
|
||||||
return bman, nil
|
return bman, nil
|
||||||
}
|
}
|
||||||
lchain := makeChain(bman, parent, n, db, CanonicalSeed)
|
lchain := makeChain(bman, parent, n, db, CanonicalSeed)
|
||||||
err := bman.bc.InsertChain(lchain)
|
_, err := bman.bc.InsertChain(lchain)
|
||||||
return bman, err
|
return bman, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -497,7 +497,9 @@ func (self *ChainManager) procFutureBlocks() {
|
||||||
self.InsertChain(blocks)
|
self.InsertChain(blocks)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *ChainManager) InsertChain(chain types.Blocks) error {
|
// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned
|
||||||
|
// it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go).
|
||||||
|
func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
|
||||||
// A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring.
|
// A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring.
|
||||||
var (
|
var (
|
||||||
queue = make([]interface{}, len(chain))
|
queue = make([]interface{}, len(chain))
|
||||||
|
@ -540,7 +542,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
|
||||||
glog.V(logger.Error).Infoln(err)
|
glog.V(logger.Error).Infoln(err)
|
||||||
glog.V(logger.Debug).Infoln(block)
|
glog.V(logger.Debug).Infoln(block)
|
||||||
|
|
||||||
return err
|
return i, err
|
||||||
}
|
}
|
||||||
|
|
||||||
block.Td = new(big.Int).Set(CalculateTD(block, self.GetBlock(block.ParentHash())))
|
block.Td = new(big.Int).Set(CalculateTD(block, self.GetBlock(block.ParentHash())))
|
||||||
|
@ -591,7 +593,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if glog.V(logger.Detail) {
|
if glog.V(logger.Detail) {
|
||||||
glog.Infof("inserted forked block #%d (%d TXs %d UNCs) (%x...)\n", block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4])
|
glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...)\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4])
|
||||||
}
|
}
|
||||||
|
|
||||||
queue[i] = ChainSideEvent{block, logs}
|
queue[i] = ChainSideEvent{block, logs}
|
||||||
|
@ -613,7 +615,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
|
||||||
|
|
||||||
go self.eventMux.Post(queueEvent)
|
go self.eventMux.Post(queueEvent)
|
||||||
|
|
||||||
return nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// diff takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
|
// diff takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/state"
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
|
@ -44,7 +45,7 @@ func testFork(t *testing.T, bman *BlockProcessor, i, N int, f func(td1, td2 *big
|
||||||
// extend the fork
|
// extend the fork
|
||||||
parent := bman2.bc.CurrentBlock()
|
parent := bman2.bc.CurrentBlock()
|
||||||
chainB := makeChain(bman2, parent, N, db, ForkSeed)
|
chainB := makeChain(bman2, parent, N, db, ForkSeed)
|
||||||
err = bman2.bc.InsertChain(chainB)
|
_, err = bman2.bc.InsertChain(chainB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Insert chain error for fork:", err)
|
t.Fatal("Insert chain error for fork:", err)
|
||||||
}
|
}
|
||||||
|
@ -108,7 +109,7 @@ func loadChain(fn string, t *testing.T) (types.Blocks, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func insertChain(done chan bool, chainMan *ChainManager, chain types.Blocks, t *testing.T) {
|
func insertChain(done chan bool, chainMan *ChainManager, chain types.Blocks, t *testing.T) {
|
||||||
err := chainMan.InsertChain(chain)
|
_, err := chainMan.InsertChain(chain)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
t.FailNow()
|
t.FailNow()
|
||||||
|
@ -369,11 +370,8 @@ func makeChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.Block
|
||||||
return chain
|
return chain
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReorg(t *testing.T) {
|
func chm(genesis *types.Block, db common.Database) *ChainManager {
|
||||||
db, _ := ethdb.NewMemDatabase()
|
|
||||||
var eventMux event.TypeMux
|
var eventMux event.TypeMux
|
||||||
|
|
||||||
genesis := GenesisBlock(db)
|
|
||||||
bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: &eventMux}
|
bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: &eventMux}
|
||||||
bc.cache = NewBlockCache(100)
|
bc.cache = NewBlockCache(100)
|
||||||
bc.futureBlocks = NewBlockCache(100)
|
bc.futureBlocks = NewBlockCache(100)
|
||||||
|
@ -381,6 +379,14 @@ func TestReorg(t *testing.T) {
|
||||||
bc.ResetWithGenesisBlock(genesis)
|
bc.ResetWithGenesisBlock(genesis)
|
||||||
bc.txState = state.ManageState(bc.State())
|
bc.txState = state.ManageState(bc.State())
|
||||||
|
|
||||||
|
return bc
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReorgLongest(t *testing.T) {
|
||||||
|
db, _ := ethdb.NewMemDatabase()
|
||||||
|
genesis := GenesisBlock(db)
|
||||||
|
bc := chm(genesis, db)
|
||||||
|
|
||||||
chain1 := makeChainWithDiff(genesis, []int{1, 2, 4}, 10)
|
chain1 := makeChainWithDiff(genesis, []int{1, 2, 4}, 10)
|
||||||
chain2 := makeChainWithDiff(genesis, []int{1, 2, 3, 4}, 11)
|
chain2 := makeChainWithDiff(genesis, []int{1, 2, 3, 4}, 11)
|
||||||
|
|
||||||
|
@ -394,3 +400,22 @@ func TestReorg(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReorgShortest(t *testing.T) {
|
||||||
|
db, _ := ethdb.NewMemDatabase()
|
||||||
|
genesis := GenesisBlock(db)
|
||||||
|
bc := chm(genesis, db)
|
||||||
|
|
||||||
|
chain1 := makeChainWithDiff(genesis, []int{1, 2, 3, 4}, 10)
|
||||||
|
chain2 := makeChainWithDiff(genesis, []int{1, 10}, 11)
|
||||||
|
|
||||||
|
bc.InsertChain(chain1)
|
||||||
|
bc.InsertChain(chain2)
|
||||||
|
|
||||||
|
prev := bc.CurrentBlock()
|
||||||
|
for block := bc.GetBlockByNumber(bc.CurrentBlock().NumberU64() - 1); block.NumberU64() != 0; prev, block = block, bc.GetBlockByNumber(block.NumberU64()-1) {
|
||||||
|
if prev.ParentHash() != block.Hash() {
|
||||||
|
t.Errorf("parent hash mismatch %x - %x", prev.ParentHash(), block.Hash())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -306,6 +306,27 @@ func (pool *TxPool) checkQueue() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pool *TxPool) removeTx(hash common.Hash) {
|
||||||
|
// delete from pending pool
|
||||||
|
delete(pool.txs, hash)
|
||||||
|
|
||||||
|
// delete from queue
|
||||||
|
out:
|
||||||
|
for address, txs := range pool.queue {
|
||||||
|
for i, tx := range txs {
|
||||||
|
if tx.Hash() == hash {
|
||||||
|
if len(txs) == 1 {
|
||||||
|
// if only one tx, remove entire address entry
|
||||||
|
delete(pool.queue, address)
|
||||||
|
} else {
|
||||||
|
pool.queue[address][len(txs)-1], pool.queue[address] = nil, append(txs[:i], txs[i+1:]...)
|
||||||
|
}
|
||||||
|
break out
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (pool *TxPool) validatePool() {
|
func (pool *TxPool) validatePool() {
|
||||||
pool.mu.Lock()
|
pool.mu.Lock()
|
||||||
defer pool.mu.Unlock()
|
defer pool.mu.Unlock()
|
||||||
|
@ -316,7 +337,7 @@ func (pool *TxPool) validatePool() {
|
||||||
glog.Infof("removed tx (%x) from pool: %v\n", hash[:4], err)
|
glog.Infof("removed tx (%x) from pool: %v\n", hash[:4], err)
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(pool.txs, hash)
|
pool.removeTx(hash)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,3 +111,30 @@ func TestTransactionQueue(t *testing.T) {
|
||||||
t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
|
t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRemoveTx(t *testing.T) {
|
||||||
|
pool, key := setupTxPool()
|
||||||
|
tx := transaction()
|
||||||
|
tx.SignECDSA(key)
|
||||||
|
from, _ := tx.From()
|
||||||
|
pool.currentState().AddBalance(from, big.NewInt(1))
|
||||||
|
pool.queueTx(tx)
|
||||||
|
pool.addTx(tx)
|
||||||
|
if len(pool.queue) != 1 {
|
||||||
|
t.Error("expected queue to be 1, got", len(pool.queue))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pool.txs) != 1 {
|
||||||
|
t.Error("expected txs to be 1, got", len(pool.txs))
|
||||||
|
}
|
||||||
|
|
||||||
|
pool.removeTx(tx.Hash())
|
||||||
|
|
||||||
|
if len(pool.queue) > 0 {
|
||||||
|
t.Error("expected queue to be 0, got", len(pool.queue))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pool.txs) > 0 {
|
||||||
|
t.Error("expected txs to be 0, got", len(pool.txs))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -99,6 +99,8 @@ type Block struct {
|
||||||
Td *big.Int
|
Td *big.Int
|
||||||
queued bool // flag for blockpool to skip TD check
|
queued bool // flag for blockpool to skip TD check
|
||||||
|
|
||||||
|
ReceivedAt time.Time
|
||||||
|
|
||||||
receipts Receipts
|
receipts Receipts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
type hashCheckFn func(common.Hash) bool
|
type hashCheckFn func(common.Hash) bool
|
||||||
type chainInsertFn func(types.Blocks) error
|
type chainInsertFn func(types.Blocks) (int, error)
|
||||||
type hashIterFn func() (common.Hash, error)
|
type hashIterFn func() (common.Hash, error)
|
||||||
|
|
||||||
type blockPack struct {
|
type blockPack struct {
|
||||||
|
@ -418,27 +418,30 @@ func (d *Downloader) process(peer *peer) error {
|
||||||
// link). We should at least check whihc queue match. This code could move
|
// link). We should at least check whihc queue match. This code could move
|
||||||
// to a seperate goroutine where it periodically checks for linked pieces.
|
// to a seperate goroutine where it periodically checks for linked pieces.
|
||||||
types.BlockBy(types.Number).Sort(d.queue.blocks)
|
types.BlockBy(types.Number).Sort(d.queue.blocks)
|
||||||
blocks := d.queue.blocks
|
if len(d.queue.blocks) == 0 {
|
||||||
if len(blocks) == 0 {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
blocks = d.queue.blocks
|
||||||
|
err error
|
||||||
|
)
|
||||||
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
|
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
|
||||||
|
|
||||||
var err error
|
|
||||||
// Loop untill we're out of blocks
|
// Loop untill we're out of blocks
|
||||||
for len(blocks) != 0 {
|
for len(blocks) != 0 {
|
||||||
max := int(math.Min(float64(len(blocks)), 256))
|
max := int(math.Min(float64(len(blocks)), 256))
|
||||||
// TODO check for parent error. When there's a parent error we should stop
|
// TODO check for parent error. When there's a parent error we should stop
|
||||||
// processing and start requesting the `block.hash` so that it's parent and
|
// processing and start requesting the `block.hash` so that it's parent and
|
||||||
// grandparents can be requested and queued.
|
// grandparents can be requested and queued.
|
||||||
err = d.insertChain(blocks[:max])
|
var i int
|
||||||
|
i, err = d.insertChain(blocks[:max])
|
||||||
if err != nil && core.IsParentErr(err) {
|
if err != nil && core.IsParentErr(err) {
|
||||||
glog.V(logger.Debug).Infoln("Aborting process due to missing parent.")
|
// Ignore the missing blocks. Handler should take care of anything that's missing.
|
||||||
|
glog.V(logger.Debug).Infof("Ignored block with missing parent (%d)\n", i)
|
||||||
|
blocks = blocks[i+1:]
|
||||||
|
|
||||||
// XXX this needs a lot of attention
|
continue
|
||||||
blocks = nil
|
|
||||||
break
|
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
// immediatly unregister the false peer but do not disconnect
|
// immediatly unregister the false peer but do not disconnect
|
||||||
d.UnregisterPeer(d.activePeer)
|
d.UnregisterPeer(d.activePeer)
|
||||||
|
|
|
@ -62,10 +62,10 @@ func (dl *downloadTester) hasBlock(hash common.Hash) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dl *downloadTester) insertChain(blocks types.Blocks) error {
|
func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) {
|
||||||
dl.insertedBlocks += len(blocks)
|
dl.insertedBlocks += len(blocks)
|
||||||
|
|
||||||
return nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dl *downloadTester) getHashes(hash common.Hash) error {
|
func (dl *downloadTester) getHashes(hash common.Hash) error {
|
||||||
|
|
|
@ -346,6 +346,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
|
||||||
if err := request.Block.ValidateFields(); err != nil {
|
if err := request.Block.ValidateFields(); err != nil {
|
||||||
return errResp(ErrDecode, "block validation %v: %v", msg, err)
|
return errResp(ErrDecode, "block validation %v: %v", msg, err)
|
||||||
}
|
}
|
||||||
|
request.Block.ReceivedAt = msg.ReceivedAt
|
||||||
|
|
||||||
hash := request.Block.Hash()
|
hash := request.Block.Hash()
|
||||||
// Add the block hash as a known hash to the peer. This will later be used to determine
|
// Add the block hash as a known hash to the peer. This will later be used to determine
|
||||||
// who should receive this.
|
// who should receive this.
|
||||||
|
@ -376,7 +378,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
|
||||||
// if the parent exists we process the block and propagate to our peers
|
// if the parent exists we process the block and propagate to our peers
|
||||||
// if the parent does not exists we delegate to the downloader.
|
// if the parent does not exists we delegate to the downloader.
|
||||||
if self.chainman.HasBlock(request.Block.ParentHash()) {
|
if self.chainman.HasBlock(request.Block.ParentHash()) {
|
||||||
if err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
|
if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
|
||||||
// handle error
|
// handle error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -419,7 +421,7 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block)
|
||||||
for _, peer := range peers {
|
for _, peer := range peers {
|
||||||
peer.sendNewBlock(block)
|
peer.sendNewBlock(block)
|
||||||
}
|
}
|
||||||
glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers")
|
glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers. Total propagation time:", time.Since(block.ReceivedAt))
|
||||||
}
|
}
|
||||||
|
|
||||||
// BroadcastTx will propagate the block to its connected peers. It will sort
|
// BroadcastTx will propagate the block to its connected peers. It will sort
|
||||||
|
|
|
@ -184,7 +184,7 @@ func (self *worker) wait() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := self.chain.InsertChain(types.Blocks{block}); err == nil {
|
if _, err := self.chain.InsertChain(types.Blocks{block}); err == nil {
|
||||||
for _, uncle := range block.Uncles() {
|
for _, uncle := range block.Uncles() {
|
||||||
delete(self.possibleUncles, uncle.Hash())
|
delete(self.possibleUncles, uncle.Hash())
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ type Msg struct {
|
||||||
Code uint64
|
Code uint64
|
||||||
Size uint32 // size of the paylod
|
Size uint32 // size of the paylod
|
||||||
Payload io.Reader
|
Payload io.Reader
|
||||||
|
ReceivedAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode parses the RLP content of a message into
|
// Decode parses the RLP content of a message into
|
||||||
|
|
|
@ -177,6 +177,7 @@ func (p *Peer) readLoop(errc chan<- error) {
|
||||||
errc <- err
|
errc <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
msg.ReceivedAt = time.Now()
|
||||||
if err = p.handle(msg); err != nil {
|
if err = p.handle(msg); err != nil {
|
||||||
errc <- err
|
errc <- err
|
||||||
return
|
return
|
||||||
|
|
|
@ -162,7 +162,7 @@ func (t *BlockTest) TryBlocksInsert(chainManager *core.ChainManager) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// RLP decoding worked, try to insert into chain:
|
// RLP decoding worked, try to insert into chain:
|
||||||
err = chainManager.InsertChain(types.Blocks{cb})
|
_, err = chainManager.InsertChain(types.Blocks{cb})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if b.BlockHeader == nil {
|
if b.BlockHeader == nil {
|
||||||
continue // OK - block is supposed to be invalid, continue with next block
|
continue // OK - block is supposed to be invalid, continue with next block
|
||||||
|
|
Loading…
Reference in New Issue