Merge pull request #1633 from tendermint/jae/blockchain_pool
Potential fix for blockchain pool halting issue
This commit is contained in:
commit
1c0bfe1158
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
|
@ -66,11 +67,13 @@ type BlockPool struct {
|
|||
// block requests
|
||||
requesters map[int64]*bpRequester
|
||||
height int64 // the lowest key in requesters.
|
||||
numPending int32 // number of requests pending assignment or block response
|
||||
// peers
|
||||
peers map[p2p.ID]*bpPeer
|
||||
maxPeerHeight int64
|
||||
|
||||
// atomic
|
||||
numPending int32 // number of requests pending assignment or block response
|
||||
|
||||
requestsCh chan<- BlockRequest
|
||||
errorsCh chan<- peerError
|
||||
}
|
||||
|
@ -151,7 +154,7 @@ func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequester
|
|||
pool.mtx.Lock()
|
||||
defer pool.mtx.Unlock()
|
||||
|
||||
return pool.height, pool.numPending, len(pool.requesters)
|
||||
return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
|
||||
}
|
||||
|
||||
// TODO: relax conditions, prevent abuse.
|
||||
|
@ -245,7 +248,7 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
|
|||
}
|
||||
|
||||
if requester.setBlock(block, peerID) {
|
||||
pool.numPending--
|
||||
atomic.AddInt32(&pool.numPending, -1)
|
||||
peer := pool.peers[peerID]
|
||||
if peer != nil {
|
||||
peer.decrPending(blockSize)
|
||||
|
@ -291,10 +294,7 @@ func (pool *BlockPool) RemovePeer(peerID p2p.ID) {
|
|||
func (pool *BlockPool) removePeer(peerID p2p.ID) {
|
||||
for _, requester := range pool.requesters {
|
||||
if requester.getPeerID() == peerID {
|
||||
if requester.getBlock() != nil {
|
||||
pool.numPending++
|
||||
}
|
||||
go requester.redo() // pick another peer and ...
|
||||
requester.redo()
|
||||
}
|
||||
}
|
||||
delete(pool.peers, peerID)
|
||||
|
@ -332,7 +332,7 @@ func (pool *BlockPool) makeNextRequester() {
|
|||
// request.SetLogger(pool.Logger.With("height", nextHeight))
|
||||
|
||||
pool.requesters[nextHeight] = request
|
||||
pool.numPending++
|
||||
atomic.AddInt32(&pool.numPending, 1)
|
||||
|
||||
err := request.Start()
|
||||
if err != nil {
|
||||
|
@ -360,7 +360,7 @@ func (pool *BlockPool) sendError(err error, peerID p2p.ID) {
|
|||
|
||||
// unused by tendermint; left for debugging purposes
|
||||
func (pool *BlockPool) debug() string {
|
||||
pool.mtx.Lock() // Lock
|
||||
pool.mtx.Lock()
|
||||
defer pool.mtx.Unlock()
|
||||
|
||||
str := ""
|
||||
|
@ -466,8 +466,8 @@ func newBPRequester(pool *BlockPool, height int64) *bpRequester {
|
|||
bpr := &bpRequester{
|
||||
pool: pool,
|
||||
height: height,
|
||||
gotBlockCh: make(chan struct{}),
|
||||
redoCh: make(chan struct{}),
|
||||
gotBlockCh: make(chan struct{}, 1),
|
||||
redoCh: make(chan struct{}, 1),
|
||||
|
||||
peerID: "",
|
||||
block: nil,
|
||||
|
@ -481,7 +481,7 @@ func (bpr *bpRequester) OnStart() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Returns true if the peer matches
|
||||
// Returns true if the peer matches and block doesn't already exist.
|
||||
func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
|
||||
bpr.mtx.Lock()
|
||||
if bpr.block != nil || bpr.peerID != peerID {
|
||||
|
@ -491,7 +491,10 @@ func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
|
|||
bpr.block = block
|
||||
bpr.mtx.Unlock()
|
||||
|
||||
bpr.gotBlockCh <- struct{}{}
|
||||
select {
|
||||
case bpr.gotBlockCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -507,17 +510,27 @@ func (bpr *bpRequester) getPeerID() p2p.ID {
|
|||
return bpr.peerID
|
||||
}
|
||||
|
||||
// This is called from the requestRoutine, upon redo().
|
||||
func (bpr *bpRequester) reset() {
|
||||
bpr.mtx.Lock()
|
||||
defer bpr.mtx.Unlock()
|
||||
|
||||
if bpr.block != nil {
|
||||
atomic.AddInt32(&bpr.pool.numPending, 1)
|
||||
}
|
||||
|
||||
bpr.peerID = ""
|
||||
bpr.block = nil
|
||||
bpr.mtx.Unlock()
|
||||
}
|
||||
|
||||
// Tells bpRequester to pick another peer and try again.
|
||||
// NOTE: blocking
|
||||
// NOTE: Nonblocking, and does nothing if another redo
|
||||
// was already requested.
|
||||
func (bpr *bpRequester) redo() {
|
||||
bpr.redoCh <- struct{}{}
|
||||
select {
|
||||
case bpr.redoCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Responsible for making more requests as necessary
|
||||
|
@ -546,17 +559,8 @@ OUTER_LOOP:
|
|||
|
||||
// Send request and wait.
|
||||
bpr.pool.sendRequest(bpr.height, peer.id)
|
||||
select {
|
||||
case <-bpr.pool.Quit():
|
||||
bpr.Stop()
|
||||
return
|
||||
case <-bpr.Quit():
|
||||
return
|
||||
case <-bpr.redoCh:
|
||||
bpr.reset()
|
||||
continue OUTER_LOOP // When peer is removed
|
||||
case <-bpr.gotBlockCh:
|
||||
// We got the block, now see if it's good.
|
||||
WAIT_LOOP:
|
||||
for {
|
||||
select {
|
||||
case <-bpr.pool.Quit():
|
||||
bpr.Stop()
|
||||
|
@ -566,6 +570,10 @@ OUTER_LOOP:
|
|||
case <-bpr.redoCh:
|
||||
bpr.reset()
|
||||
continue OUTER_LOOP
|
||||
case <-bpr.gotBlockCh:
|
||||
// We got a block!
|
||||
// Continue the for-loop and wait til Quit.
|
||||
continue WAIT_LOOP
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue