Merge pull request #692 from ethersphere/frontier/blockpool

td update from node + bugfix
This commit is contained in:
Jeffrey Wilcke 2015-04-10 18:20:23 +02:00
commit 6107b53de0
4 changed files with 149 additions and 140 deletions

View File

@ -169,6 +169,9 @@ type BlockPool struct {
// alloc-easy pool of hash slices // alloc-easy pool of hash slices
hashSlicePool chan []common.Hash hashSlicePool chan []common.Hash
nodeCache map[common.Hash]*node
nodeCacheLock sync.RWMutex
// waitgroup is used in tests to wait for result-critical routines // waitgroup is used in tests to wait for result-critical routines
// as well as in determining idle / syncing status // as well as in determining idle / syncing status
wg sync.WaitGroup // wg sync.WaitGroup //
@ -210,6 +213,7 @@ func (self *BlockPool) Start() {
self.Config.init() self.Config.init()
self.hashSlicePool = make(chan []common.Hash, 150) self.hashSlicePool = make(chan []common.Hash, 150)
self.nodeCache = make(map[common.Hash]*node)
self.status = newStatus() self.status = newStatus()
self.quit = make(chan bool) self.quit = make(chan bool)
self.pool = make(map[common.Hash]*entry) self.pool = make(map[common.Hash]*entry)
@ -615,53 +619,25 @@ LOOP:
If the block received is the head block of the current best peer, signal it to the head section process If the block received is the head block of the current best peer, signal it to the head section process
*/ */
func (self *BlockPool) AddBlock(block *types.Block, peerId string) { func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
hash := block.Hash()
sender, _ := self.peers.getPeer(peerId)
if sender == nil {
return
}
self.status.lock.Lock() self.status.lock.Lock()
self.status.activePeers[peerId]++ self.status.activePeers[peerId]++
self.status.lock.Unlock() self.status.lock.Unlock()
hash := block.Hash()
// check if block is already inserted in the blockchain
if self.hasBlock(hash) {
return
}
sender, _ := self.peers.getPeer(peerId)
if sender == nil {
return
}
tdFromCurrentHead, currentBlockHash := sender.setChainInfoFromBlock(block)
entry := self.get(hash) entry := self.get(hash)
blockIsCurrentHead := false
sender.lock.RLock()
currentBlockHash := sender.currentBlockHash
currentBlock := sender.currentBlock
currentBlockC := sender.currentBlockC
switchC := sender.switchC
sender.lock.RUnlock()
// a peer's current head block is appearing the first time
if hash == currentBlockHash {
// this happens when block came in a newblock message but
// also if sent in a blockmsg (for instance, if we requested, only if we
// dont apply on blockrequests the restriction of flood control)
blockIsCurrentHead = true
if currentBlock == nil {
sender.lock.Lock()
sender.setChainInfoFromBlock(block)
sender.lock.Unlock()
self.status.lock.Lock()
self.status.values.BlockHashes++
self.status.values.Blocks++
self.status.values.BlocksInPool++
self.status.lock.Unlock()
// signal to head section process
select {
case currentBlockC <- block:
case <-switchC:
}
} else {
plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(currentBlockHash))
}
} else {
plog.DebugDetailf("AddBlock: block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(currentBlockHash))
/* @zelig !!! /* @zelig !!!
requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section. requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section.
@ -676,42 +652,47 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
return return
} }
*/ */
}
var bnode *node
if entry == nil { if entry == nil {
// FIXME: here check the cache find or create node - self.nodeCacheLock.Lock()
// put peer as blockBy! bnode, _ = self.nodeCache[hash]
return if bnode == nil {
bnode = &node{
hash: currentBlockHash,
block: block,
hashBy: peerId,
blockBy: peerId,
td: tdFromCurrentHead,
}
self.nodeCache[hash] = bnode
}
self.nodeCacheLock.Unlock()
} else {
bnode = entry.node
} }
node := entry.node bnode.lock.Lock()
node.lock.Lock() defer bnode.lock.Unlock()
defer node.lock.Unlock()
// check if block already received
if bnode.block != nil {
plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), bnode.blockBy)
// register peer on node as source // register peer on node as source
if node.peers == nil { if bnode.peers == nil {
node.peers = make(map[string]bool) bnode.peers = make(map[string]bool)
} }
FoundBlockCurrentHead, found := node.peers[sender.id] foundBlockCurrentHead, found := bnode.peers[sender.id]
if !found || FoundBlockCurrentHead { if !found || foundBlockCurrentHead {
// if found but not FoundBlockCurrentHead, then no update // if found but not FoundBlockCurrentHead, then no update
// necessary (||) // necessary (||)
node.peers[sender.id] = blockIsCurrentHead bnode.peers[sender.id] = (currentBlockHash == hash)
// for those that are false, TD will update their head // for those that are false, TD will update their head
// for those that are true, TD is checked ! // for those that are true, TD is checked !
// this is checked at the time of TD calculation in checkTD // this is checked at the time of TD calculation in checkTD
} }
// check if block already received sender.setChainInfoFromNode(bnode)
if node.block != nil { } else {
plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), node.blockBy)
}
// check if block is already inserted in the blockchain
if self.hasBlock(hash) {
plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already in the blockchain", hex(hash), peerId, hex(sender.currentBlockHash))
return
}
/* /*
@zelig needs discussing @zelig needs discussing
Viktor: pow check can be delayed in a go routine and therefore cache Viktor: pow check can be delayed in a go routine and therefore cache
@ -728,14 +709,14 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
return return
} }
*/ */
bnode.block = block
node.block = block bnode.blockBy = peerId
node.blockBy = peerId bnode.td = tdFromCurrentHead
self.status.lock.Lock() self.status.lock.Lock()
self.status.values.Blocks++ self.status.values.Blocks++
self.status.values.BlocksInPool++ self.status.values.BlocksInPool++
self.status.lock.Unlock() self.status.lock.Unlock()
}
} }

View File

@ -128,7 +128,7 @@ func TestErrInsufficientChainInfo(t *testing.T) {
} }
func TestIncorrectTD(t *testing.T) { func TestIncorrectTD(t *testing.T) {
t.Skip() // td not tested atm t.Skip("skipping TD check until network is healthy")
test.LogInit() test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t) _, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil blockPoolTester.blockChain[0] = nil

View File

@ -18,6 +18,7 @@ type peer struct {
// last known blockchain status // last known blockchain status
td *big.Int td *big.Int
tdAdvertised bool
currentBlockHash common.Hash currentBlockHash common.Hash
currentBlock *types.Block currentBlock *types.Block
parentHash common.Hash parentHash common.Hash
@ -135,21 +136,52 @@ func (self *peer) addError(code int, format string, params ...interface{}) {
} }
// caller must hold peer lock // caller must hold peer lock
func (self *peer) setChainInfo(td *big.Int, c common.Hash) { func (self *peer) setChainInfo(td *big.Int, currentBlockHash common.Hash) {
self.lock.Lock()
defer self.lock.Unlock()
if self.currentBlockHash != currentBlockHash {
previousBlockHash := self.currentBlockHash
plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", self.id, td, hex(currentBlockHash), hex(previousBlockHash))
self.td = td self.td = td
self.currentBlockHash = c self.currentBlockHash = currentBlockHash
self.currentBlock = nil self.currentBlock = nil
self.parentHash = common.Hash{} self.parentHash = common.Hash{}
self.headSection = nil self.headSection = nil
}
self.tdAdvertised = true
} }
// caller must hold peer lock func (self *peer) setChainInfoFromBlock(block *types.Block) (td *big.Int, currentBlockHash common.Hash) {
func (self *peer) setChainInfoFromBlock(block *types.Block) { self.lock.Lock()
// use the optional TD to update peer td, this helps second best peer selection defer self.lock.Unlock()
hash := block.Hash()
// this happens when block came in a newblock message but
// also if sent in a blockmsg (for instance, if we requested, only if we
// dont apply on blockrequests the restriction of flood control)
currentBlockHash = self.currentBlockHash
if currentBlockHash == hash && self.currentBlock == nil {
// signal to head section process
plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) received\n", hex(hash), self.id, hex(currentBlockHash))
select {
case self.currentBlockC <- block:
case <-self.switchC:
}
return self.td, currentBlockHash
} else {
plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), self.id, hex(currentBlockHash))
return nil, currentBlockHash
}
}
// this will use the TD given by the first peer to update peer td, this helps second best peer selection
// :FIXME: node
func (self *peer) setChainInfoFromNode(n *node) {
// in case best peer is lost // in case best peer is lost
if block.Td != nil && block.Td.Cmp(self.td) > 0 { block := n.block
plog.DebugDetailf("setChainInfoFromBlock: update <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(block.Hash()), self.td, block.Td) hash := block.Hash()
self.td = block.Td if n.td != nil && n.td.Cmp(self.td) > 0 {
plog.DebugDetailf("AddBlock: update peer <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(hash), self.td, n.td)
self.td = n.td
self.currentBlockHash = block.Hash() self.currentBlockHash = block.Hash()
self.parentHash = block.ParentHash() self.parentHash = block.ParentHash()
self.currentBlock = block self.currentBlock = block
@ -218,17 +250,11 @@ func (self *peers) addPeer(
if found { if found {
// when called on an already connected peer, it means a newBlockMsg is received // when called on an already connected peer, it means a newBlockMsg is received
// peer head info is updated // peer head info is updated
p.lock.Lock()
if p.currentBlockHash != currentBlockHash {
previousBlockHash = p.currentBlockHash
plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash))
p.setChainInfo(td, currentBlockHash) p.setChainInfo(td, currentBlockHash)
// FIXME: only count the same block once
self.status.lock.Lock() self.status.lock.Lock()
self.status.values.NewBlocks++ self.status.values.NewBlocks++
self.status.lock.Unlock() self.status.lock.Unlock()
}
p.lock.Unlock()
} else { } else {
p = self.newPeer(td, currentBlockHash, id, requestBlockHashes, requestBlocks, peerError) p = self.newPeer(td, currentBlockHash, id, requestBlockHashes, requestBlocks, peerError)
@ -333,8 +359,8 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
close(oldp.switchC) close(oldp.switchC)
} }
if newp != nil { if newp != nil {
newp.idleC = make(chan bool) // newp.idleC = make(chan bool)
newp.switchC = make(chan bool) // newp.switchC = make(chan bool)
// if new best peer has no head section yet, create it and run it // if new best peer has no head section yet, create it and run it
// otherwise head section is an element of peer.sections // otherwise head section is an element of peer.sections
if newp.headSection == nil { if newp.headSection == nil {
@ -354,6 +380,9 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
} }
}() }()
} else {
newp.idleC = make(chan bool)
newp.switchC = make(chan bool)
} }
var connected = make(map[common.Hash]*section) var connected = make(map[common.Hash]*section)
@ -528,10 +557,12 @@ func (self *peer) getBlockHashes() bool {
// main loop for head section process // main loop for head section process
func (self *peer) run() { func (self *peer) run() {
self.lock.RLock() self.lock.Lock()
self.switchC = make(chan bool)
self.idleC = make(chan bool)
switchC := self.switchC switchC := self.switchC
plog.Debugf("HeadSection: <%s> section process for head %s started", self.id, hex(self.currentBlockHash)) plog.Debugf("HeadSection: <%s> section process for head %s started", self.id, hex(self.currentBlockHash))
self.lock.RUnlock() self.lock.Unlock()
self.blockHashesRequestTimer = nil self.blockHashesRequestTimer = nil

View File

@ -145,7 +145,6 @@ func TestAddPeer(t *testing.T) {
} }
func TestPeerPromotionByTdOnBlock(t *testing.T) { func TestPeerPromotionByTdOnBlock(t *testing.T) {
t.Skip()
test.LogInit() test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t) _, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil blockPoolTester.blockChain[0] = nil
@ -155,28 +154,26 @@ func TestPeerPromotionByTdOnBlock(t *testing.T) {
peer2 := blockPoolTester.newPeer("peer2", 4, 4) peer2 := blockPoolTester.newPeer("peer2", 4, 4)
blockPool.Start() blockPool.Start()
blockPoolTester.tds = make(map[int]int)
blockPoolTester.tds[3] = 3
// pool
peer0.AddPeer() peer0.AddPeer()
peer0.serveBlocks(1, 2) peer0.serveBlocks(1, 2)
best := peer1.AddPeer() best := peer1.AddPeer()
// this tests that peer1 is not promoted over peer0 yet // this tests that peer1 is not promoted over peer0 yet
if best { if best {
t.Errorf("peer1 (TD=1) should not be set as best") t.Errorf("peer1 (TD=1) should not be set as best")
return
} }
best = peer2.AddPeer() best = peer2.AddPeer()
peer2.serveBlocks(3, 4) peer2.serveBlocks(3, 4)
peer2.serveBlockHashes(4, 3, 2, 1) peer2.serveBlockHashes(4, 3, 2, 1)
// hashes := blockPoolTester.hashPool.IndexesToHashes([]int{2, 3}) peer1.sendBlocks(3, 4)
peer1.serveBlocks(2, 3)
blockPool.RemovePeer("peer2") blockPool.RemovePeer("peer2")
if blockPool.peers.best.id != "peer1" { if blockPool.peers.best.id != "peer1" {
t.Errorf("peer1 (TD=3) should be set as best") t.Errorf("peer1 (TD=3) should be set as best")
return
} }
peer1.serveBlocks(0, 1, 2) peer1.serveBlocks(0, 1, 2, 3)
blockPool.Wait(waitTimeout) blockPool.Wait(waitTimeout)
blockPool.Stop() blockPool.Stop()