Potential fix for blockchain pool halting issue
This commit is contained in:
parent
7f20eb5f8e
commit
f55725ebfa
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
cmn "github.com/tendermint/tmlibs/common"
|
cmn "github.com/tendermint/tmlibs/common"
|
||||||
|
@ -66,11 +67,13 @@ type BlockPool struct {
|
||||||
// block requests
|
// block requests
|
||||||
requesters map[int64]*bpRequester
|
requesters map[int64]*bpRequester
|
||||||
height int64 // the lowest key in requesters.
|
height int64 // the lowest key in requesters.
|
||||||
numPending int32 // number of requests pending assignment or block response
|
|
||||||
// peers
|
// peers
|
||||||
peers map[p2p.ID]*bpPeer
|
peers map[p2p.ID]*bpPeer
|
||||||
maxPeerHeight int64
|
maxPeerHeight int64
|
||||||
|
|
||||||
|
// atomic
|
||||||
|
numPending int32 // number of requests pending assignment or block response
|
||||||
|
|
||||||
requestsCh chan<- BlockRequest
|
requestsCh chan<- BlockRequest
|
||||||
errorsCh chan<- peerError
|
errorsCh chan<- peerError
|
||||||
}
|
}
|
||||||
|
@ -151,7 +154,7 @@ func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequester
|
||||||
pool.mtx.Lock()
|
pool.mtx.Lock()
|
||||||
defer pool.mtx.Unlock()
|
defer pool.mtx.Unlock()
|
||||||
|
|
||||||
return pool.height, pool.numPending, len(pool.requesters)
|
return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: relax conditions, prevent abuse.
|
// TODO: relax conditions, prevent abuse.
|
||||||
|
@ -245,7 +248,7 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
if requester.setBlock(block, peerID) {
|
if requester.setBlock(block, peerID) {
|
||||||
pool.numPending--
|
atomic.AddInt32(&pool.numPending, -1)
|
||||||
peer := pool.peers[peerID]
|
peer := pool.peers[peerID]
|
||||||
if peer != nil {
|
if peer != nil {
|
||||||
peer.decrPending(blockSize)
|
peer.decrPending(blockSize)
|
||||||
|
@ -291,10 +294,7 @@ func (pool *BlockPool) RemovePeer(peerID p2p.ID) {
|
||||||
func (pool *BlockPool) removePeer(peerID p2p.ID) {
|
func (pool *BlockPool) removePeer(peerID p2p.ID) {
|
||||||
for _, requester := range pool.requesters {
|
for _, requester := range pool.requesters {
|
||||||
if requester.getPeerID() == peerID {
|
if requester.getPeerID() == peerID {
|
||||||
if requester.getBlock() != nil {
|
requester.redo()
|
||||||
pool.numPending++
|
|
||||||
}
|
|
||||||
go requester.redo() // pick another peer and ...
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
delete(pool.peers, peerID)
|
delete(pool.peers, peerID)
|
||||||
|
@ -332,7 +332,7 @@ func (pool *BlockPool) makeNextRequester() {
|
||||||
// request.SetLogger(pool.Logger.With("height", nextHeight))
|
// request.SetLogger(pool.Logger.With("height", nextHeight))
|
||||||
|
|
||||||
pool.requesters[nextHeight] = request
|
pool.requesters[nextHeight] = request
|
||||||
pool.numPending++
|
atomic.AddInt32(&pool.numPending, 1)
|
||||||
|
|
||||||
err := request.Start()
|
err := request.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -360,7 +360,7 @@ func (pool *BlockPool) sendError(err error, peerID p2p.ID) {
|
||||||
|
|
||||||
// unused by tendermint; left for debugging purposes
|
// unused by tendermint; left for debugging purposes
|
||||||
func (pool *BlockPool) debug() string {
|
func (pool *BlockPool) debug() string {
|
||||||
pool.mtx.Lock() // Lock
|
pool.mtx.Lock()
|
||||||
defer pool.mtx.Unlock()
|
defer pool.mtx.Unlock()
|
||||||
|
|
||||||
str := ""
|
str := ""
|
||||||
|
@ -466,8 +466,8 @@ func newBPRequester(pool *BlockPool, height int64) *bpRequester {
|
||||||
bpr := &bpRequester{
|
bpr := &bpRequester{
|
||||||
pool: pool,
|
pool: pool,
|
||||||
height: height,
|
height: height,
|
||||||
gotBlockCh: make(chan struct{}),
|
gotBlockCh: make(chan struct{}, 1),
|
||||||
redoCh: make(chan struct{}),
|
redoCh: make(chan struct{}, 1),
|
||||||
|
|
||||||
peerID: "",
|
peerID: "",
|
||||||
block: nil,
|
block: nil,
|
||||||
|
@ -481,7 +481,7 @@ func (bpr *bpRequester) OnStart() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns true if the peer matches
|
// Returns true if the peer matches and block doesn't already exist.
|
||||||
func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
|
func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
|
||||||
bpr.mtx.Lock()
|
bpr.mtx.Lock()
|
||||||
if bpr.block != nil || bpr.peerID != peerID {
|
if bpr.block != nil || bpr.peerID != peerID {
|
||||||
|
@ -491,7 +491,10 @@ func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
|
||||||
bpr.block = block
|
bpr.block = block
|
||||||
bpr.mtx.Unlock()
|
bpr.mtx.Unlock()
|
||||||
|
|
||||||
bpr.gotBlockCh <- struct{}{}
|
select {
|
||||||
|
case bpr.gotBlockCh <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -507,17 +510,27 @@ func (bpr *bpRequester) getPeerID() p2p.ID {
|
||||||
return bpr.peerID
|
return bpr.peerID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This is called from the requestRoutine, upon redo().
|
||||||
func (bpr *bpRequester) reset() {
|
func (bpr *bpRequester) reset() {
|
||||||
bpr.mtx.Lock()
|
bpr.mtx.Lock()
|
||||||
|
defer bpr.mtx.Unlock()
|
||||||
|
|
||||||
|
if bpr.block != nil {
|
||||||
|
atomic.AddInt32(&bpr.pool.numPending, 1)
|
||||||
|
}
|
||||||
|
|
||||||
bpr.peerID = ""
|
bpr.peerID = ""
|
||||||
bpr.block = nil
|
bpr.block = nil
|
||||||
bpr.mtx.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tells bpRequester to pick another peer and try again.
|
// Tells bpRequester to pick another peer and try again.
|
||||||
// NOTE: blocking
|
// NOTE: Nonblocking, and does nothing if another redo
|
||||||
|
// was already requested.
|
||||||
func (bpr *bpRequester) redo() {
|
func (bpr *bpRequester) redo() {
|
||||||
bpr.redoCh <- struct{}{}
|
select {
|
||||||
|
case bpr.redoCh <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Responsible for making more requests as necessary
|
// Responsible for making more requests as necessary
|
||||||
|
@ -546,17 +559,8 @@ OUTER_LOOP:
|
||||||
|
|
||||||
// Send request and wait.
|
// Send request and wait.
|
||||||
bpr.pool.sendRequest(bpr.height, peer.id)
|
bpr.pool.sendRequest(bpr.height, peer.id)
|
||||||
select {
|
WAIT_LOOP:
|
||||||
case <-bpr.pool.Quit():
|
for {
|
||||||
bpr.Stop()
|
|
||||||
return
|
|
||||||
case <-bpr.Quit():
|
|
||||||
return
|
|
||||||
case <-bpr.redoCh:
|
|
||||||
bpr.reset()
|
|
||||||
continue OUTER_LOOP // When peer is removed
|
|
||||||
case <-bpr.gotBlockCh:
|
|
||||||
// We got the block, now see if it's good.
|
|
||||||
select {
|
select {
|
||||||
case <-bpr.pool.Quit():
|
case <-bpr.pool.Quit():
|
||||||
bpr.Stop()
|
bpr.Stop()
|
||||||
|
@ -566,6 +570,10 @@ OUTER_LOOP:
|
||||||
case <-bpr.redoCh:
|
case <-bpr.redoCh:
|
||||||
bpr.reset()
|
bpr.reset()
|
||||||
continue OUTER_LOOP
|
continue OUTER_LOOP
|
||||||
|
case <-bpr.gotBlockCh:
|
||||||
|
// We got a block!
|
||||||
|
// Continue the for-loop and wait til Quit.
|
||||||
|
continue WAIT_LOOP
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue