Unify blockpool mtxs

This commit is contained in:
Jae Kwon 2016-06-28 18:02:27 -07:00
parent ca674304c5
commit b25cfb0e0b
1 changed files with 40 additions and 65 deletions

View File

@ -35,15 +35,13 @@ type BlockPool struct {
QuitService QuitService
startTime time.Time startTime time.Time
mtx sync.Mutex
// block requests // block requests
mtx sync.Mutex
requesters map[int]*bpRequester requesters map[int]*bpRequester
height int // the lowest key in requesters. height int // the lowest key in requesters.
numPending int32 // number of requests pending assignment or block response numPending int32 // number of requests pending assignment or block response
// peers // peers
peersMtx sync.Mutex peers map[string]*bpPeer
peers map[string]*bpPeer
requestsCh chan<- BlockRequest requestsCh chan<- BlockRequest
timeoutsCh chan<- string timeoutsCh chan<- string
@ -100,8 +98,9 @@ func (pool *BlockPool) makeRequestersRoutine() {
} }
func (pool *BlockPool) removeTimedoutPeers() { func (pool *BlockPool) removeTimedoutPeers() {
pool.peersMtx.Lock() // Lock pool.mtx.Lock()
defer pool.peersMtx.Unlock() defer pool.mtx.Unlock()
for _, peer := range pool.peers { for _, peer := range pool.peers {
if !peer.didTimeout && peer.numPending > 0 { if !peer.didTimeout && peer.numPending > 0 {
curRate := peer.recvMonitor.Status().CurRate curRate := peer.recvMonitor.Status().CurRate
@ -119,7 +118,7 @@ func (pool *BlockPool) removeTimedoutPeers() {
} }
func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters int) { func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters int) {
pool.mtx.Lock() // Lock pool.mtx.Lock()
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
return pool.height, pool.numPending, len(pool.requesters) return pool.height, pool.numPending, len(pool.requesters)
@ -127,10 +126,10 @@ func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters
// TODO: relax conditions, prevent abuse. // TODO: relax conditions, prevent abuse.
func (pool *BlockPool) IsCaughtUp() bool { func (pool *BlockPool) IsCaughtUp() bool {
height, _, _ := pool.GetStatus() pool.mtx.Lock()
defer pool.mtx.Unlock()
pool.peersMtx.Lock() height := pool.height
defer pool.peersMtx.Unlock()
// Need at least 1 peer to be considered caught up. // Need at least 1 peer to be considered caught up.
if len(pool.peers) == 0 { if len(pool.peers) == 0 {
@ -152,7 +151,7 @@ func (pool *BlockPool) IsCaughtUp() bool {
// So we peek two blocks at a time. // So we peek two blocks at a time.
// The caller will verify the commit. // The caller will verify the commit.
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
pool.mtx.Lock() // Lock pool.mtx.Lock()
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
if r := pool.requesters[pool.height]; r != nil { if r := pool.requesters[pool.height]; r != nil {
@ -167,7 +166,7 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block)
// Pop the first block at pool.height // Pop the first block at pool.height
// It must have been validated by 'second'.Commit from PeekTwoBlocks(). // It must have been validated by 'second'.Commit from PeekTwoBlocks().
func (pool *BlockPool) PopRequest() { func (pool *BlockPool) PopRequest() {
pool.mtx.Lock() // Lock pool.mtx.Lock()
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
if r := pool.requesters[pool.height]; r != nil { if r := pool.requesters[pool.height]; r != nil {
@ -187,7 +186,7 @@ func (pool *BlockPool) PopRequest() {
// Invalidates the block at pool.height, // Invalidates the block at pool.height,
// Remove the peer and redo request from others. // Remove the peer and redo request from others.
func (pool *BlockPool) RedoRequest(height int) { func (pool *BlockPool) RedoRequest(height int) {
pool.mtx.Lock() // Lock pool.mtx.Lock()
request := pool.requesters[height] request := pool.requesters[height]
pool.mtx.Unlock() pool.mtx.Unlock()
@ -196,12 +195,12 @@ func (pool *BlockPool) RedoRequest(height int) {
} }
// RemovePeer will redo all requesters associated with this peer. // RemovePeer will redo all requesters associated with this peer.
// TODO: record this malfeasance // TODO: record this malfeasance
pool.RemovePeer(request.peerID) // Lock on peersMtx and mtx pool.RemovePeer(request.peerID)
} }
// TODO: ensure that blocks come in order for each peer. // TODO: ensure that blocks come in order for each peer.
func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int) { func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int) {
pool.mtx.Lock() // Lock pool.mtx.Lock()
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
requester := pool.requesters[block.Height] requester := pool.requesters[block.Height]
@ -211,11 +210,8 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int
if requester.setBlock(block, peerID) { if requester.setBlock(block, peerID) {
pool.numPending-- pool.numPending--
peer := pool.getPeer(peerID) peer := pool.peers[peerID]
peer.decrPending(blockSize)
pool.peersMtx.Lock()
peer.decrPending(blockSize, pool.onTimeout(peer))
pool.peersMtx.Unlock()
} else { } else {
// Bad peer? // Bad peer?
} }
@ -223,8 +219,8 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int
// Sets the peer's alleged blockchain height. // Sets the peer's alleged blockchain height.
func (pool *BlockPool) SetPeerHeight(peerID string, height int) { func (pool *BlockPool) SetPeerHeight(peerID string, height int) {
pool.peersMtx.Lock() // Lock pool.mtx.Lock()
defer pool.peersMtx.Unlock() defer pool.mtx.Unlock()
peer := pool.peers[peerID] peer := pool.peers[peerID]
if peer != nil { if peer != nil {
@ -236,18 +232,13 @@ func (pool *BlockPool) SetPeerHeight(peerID string, height int) {
} }
func (pool *BlockPool) RemovePeer(peerID string) { func (pool *BlockPool) RemovePeer(peerID string) {
pool.peersMtx.Lock() // Lock pool.mtx.Lock()
defer pool.peersMtx.Unlock() defer pool.mtx.Unlock()
pool.removePeer(peerID) pool.removePeer(peerID)
} }
func (pool *BlockPool) removePeer(peerID string) { func (pool *BlockPool) removePeer(peerID string) {
// need to lock pool to access requesters and numPending.
// peersMtx should be locked by caller
pool.mtx.Lock()
defer pool.mtx.Unlock()
for _, requester := range pool.requesters { for _, requester := range pool.requesters {
if requester.getPeerID() == peerID { if requester.getPeerID() == peerID {
pool.numPending++ pool.numPending++
@ -257,22 +248,14 @@ func (pool *BlockPool) removePeer(peerID string) {
delete(pool.peers, peerID) delete(pool.peers, peerID)
} }
func (pool *BlockPool) getPeer(peerID string) *bpPeer {
pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock()
peer := pool.peers[peerID]
return peer
}
// Pick an available peer with at least the given minHeight. // Pick an available peer with at least the given minHeight.
// If no peers are available, returns nil. // If no peers are available, returns nil.
func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer { func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer {
pool.peersMtx.Lock() pool.mtx.Lock()
defer pool.peersMtx.Unlock() defer pool.mtx.Unlock()
for _, peer := range pool.peers { for _, peer := range pool.peers {
if peer.isBad() { if peer.didTimeout {
pool.removePeer(peer.id) pool.removePeer(peer.id)
continue continue
} else { } else {
@ -283,14 +266,14 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer {
if peer.height < minHeight { if peer.height < minHeight {
continue continue
} }
peer.incrPending(pool.onTimeout(peer)) peer.incrPending()
return peer return peer
} }
return nil return nil
} }
func (pool *BlockPool) makeNextRequester() { func (pool *BlockPool) makeNextRequester() {
pool.mtx.Lock() // Lock pool.mtx.Lock()
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
nextHeight := pool.height + len(pool.requesters) nextHeight := pool.height + len(pool.requesters)
@ -316,14 +299,6 @@ func (pool *BlockPool) sendTimeout(peerID string) {
pool.timeoutsCh <- peerID pool.timeoutsCh <- peerID
} }
func (pool *BlockPool) onTimeout(peer *bpPeer) func() {
return func() {
pool.peersMtx.Lock()
defer pool.peersMtx.Unlock()
peer.onTimeout()
}
}
func (pool *BlockPool) debug() string { func (pool *BlockPool) debug() string {
pool.mtx.Lock() // Lock pool.mtx.Lock() // Lock
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
@ -345,11 +320,13 @@ func (pool *BlockPool) debug() string {
type bpPeer struct { type bpPeer struct {
pool *BlockPool pool *BlockPool
id string id string
height int
numPending int32
recvMonitor *flow.Monitor recvMonitor *flow.Monitor
timeout *time.Timer
didTimeout bool mtx sync.Mutex
height int
numPending int32
timeout *time.Timer
didTimeout bool
} }
func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer { func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer {
@ -368,43 +345,41 @@ func (peer *bpPeer) resetMonitor() {
peer.recvMonitor.SetREMA(initialValue) peer.recvMonitor.SetREMA(initialValue)
} }
// needs the closure so we can lock the peersMtx func (peer *bpPeer) resetTimeout() {
func (peer *bpPeer) resetTimeout(callback func()) {
if peer.timeout == nil { if peer.timeout == nil {
peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, callback) peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, peer.onTimeout)
} else { } else {
peer.timeout.Reset(time.Second * peerTimeoutSeconds) peer.timeout.Reset(time.Second * peerTimeoutSeconds)
} }
} }
func (peer *bpPeer) incrPending(onTimeout func()) { func (peer *bpPeer) incrPending() {
if peer.numPending == 0 { if peer.numPending == 0 {
peer.resetMonitor() peer.resetMonitor()
peer.resetTimeout(onTimeout) peer.resetTimeout()
} }
peer.numPending++ peer.numPending++
} }
func (peer *bpPeer) decrPending(recvSize int, onTimeout func()) { func (peer *bpPeer) decrPending(recvSize int) {
peer.numPending-- peer.numPending--
if peer.numPending == 0 { if peer.numPending == 0 {
peer.timeout.Stop() peer.timeout.Stop()
} else { } else {
peer.recvMonitor.Update(recvSize) peer.recvMonitor.Update(recvSize)
peer.resetTimeout(onTimeout) peer.resetTimeout()
} }
} }
func (peer *bpPeer) onTimeout() { func (peer *bpPeer) onTimeout() {
peer.pool.mtx.Lock()
defer peer.pool.mtx.Unlock()
peer.pool.sendTimeout(peer.id) peer.pool.sendTimeout(peer.id)
log.Warn("SendTimeout", "peer", peer.id, "reason", "onTimeout") log.Warn("SendTimeout", "peer", peer.id, "reason", "onTimeout")
peer.didTimeout = true peer.didTimeout = true
} }
func (peer *bpPeer) isBad() bool {
return peer.didTimeout
}
//------------------------------------- //-------------------------------------
type bpRequester struct { type bpRequester struct {