variable renames

This commit is contained in:
Jae Kwon 2015-05-28 01:59:23 -07:00
parent 8a5c320472
commit b281674058
2 changed files with 33 additions and 37 deletions

View File

@ -18,8 +18,6 @@ const (
maxRequestsPerPeer = 300 maxRequestsPerPeer = 300
) )
// numTotal = numPending + blocks in the pool we havnt synced yet
var ( var (
requestTimeoutSeconds = time.Duration(3) requestTimeoutSeconds = time.Duration(3)
) )
@ -37,12 +35,11 @@ var (
type BlockPool struct { type BlockPool struct {
// block requests // block requests
requestsMtx sync.Mutex requestsMtx sync.Mutex
requests map[uint]*bpRequest requests map[uint]*bpRequest
peerless int32 // number of requests without peers height uint // the lowest key in requests.
height uint // the lowest key in requests. numUnassigned int32 // number of requests not yet assigned to a peer
numPending int32 numWaiting int32 // number of requests awaiting response from a peer
numTotal int32
// peers // peers
peersMtx sync.Mutex peersMtx sync.Mutex
@ -59,10 +56,10 @@ func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<-
return &BlockPool{ return &BlockPool{
peers: make(map[string]*bpPeer), peers: make(map[string]*bpPeer),
requests: make(map[uint]*bpRequest), requests: make(map[uint]*bpRequest),
height: start, height: start,
numPending: 0, numUnassigned: 0,
numTotal: 0, numWaiting: 0,
requestsCh: requestsCh, requestsCh: requestsCh,
timeoutsCh: timeoutsCh, timeoutsCh: timeoutsCh,
@ -97,11 +94,11 @@ RUN_LOOP:
if atomic.LoadInt32(&pool.running) == 0 { if atomic.LoadInt32(&pool.running) == 0 {
break RUN_LOOP break RUN_LOOP
} }
_, numPending, numTotal := pool.GetStatus() _, numWaiting := pool.GetStatus()
if numPending >= maxPendingRequests { if numWaiting >= maxPendingRequests {
// sleep for a bit. // sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond) time.Sleep(requestIntervalMS * time.Millisecond)
} else if numTotal >= maxTotalRequests { } else if len(pool.requests) >= maxTotalRequests {
// sleep for a bit. // sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond) time.Sleep(requestIntervalMS * time.Millisecond)
} else { } else {
@ -112,11 +109,11 @@ RUN_LOOP:
} }
} }
func (pool *BlockPool) GetStatus() (uint, int32, int32) { func (pool *BlockPool) GetStatus() (uint, int32) {
pool.requestsMtx.Lock() // Lock pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock() defer pool.requestsMtx.Unlock()
return pool.height, pool.numPending, pool.numTotal return pool.height, pool.numWaiting
} }
// We need to see the second block's Validation to validate the first block. // We need to see the second block's Validation to validate the first block.
@ -146,7 +143,6 @@ func (pool *BlockPool) PopRequest() {
delete(pool.requests, pool.height) delete(pool.requests, pool.height)
pool.height++ pool.height++
pool.numTotal--
} }
// Invalidates the block at pool.height. // Invalidates the block at pool.height.
@ -164,8 +160,8 @@ func (pool *BlockPool) RedoRequest(height uint) {
pool.RemovePeer(request.peerId) // Lock on peersMtx. pool.RemovePeer(request.peerId) // Lock on peersMtx.
request.block = nil request.block = nil
request.peerId = "" request.peerId = ""
pool.numPending++ pool.numWaiting++
pool.peerless++ pool.numUnassigned++
go requestRoutine(pool, height) go requestRoutine(pool, height)
} }
@ -186,7 +182,7 @@ func (pool *BlockPool) setPeerForRequest(height uint, peerId string) {
if request == nil { if request == nil {
return return
} }
pool.peerless-- pool.numUnassigned--
request.peerId = peerId request.peerId = peerId
} }
@ -198,7 +194,7 @@ func (pool *BlockPool) removePeerForRequest(height uint, peerId string) {
if request == nil { if request == nil {
return return
} }
pool.peerless++ pool.numUnassigned++
request.peerId = "" request.peerId = ""
} }
@ -217,7 +213,7 @@ func (pool *BlockPool) AddBlock(block *types.Block, peerId string) {
return return
} }
request.block = block request.block = block
pool.numPending-- pool.numWaiting--
} }
func (pool *BlockPool) getPeer(peerId string) *bpPeer { func (pool *BlockPool) getPeer(peerId string) *bpPeer {
@ -288,7 +284,7 @@ func (pool *BlockPool) nextHeight() uint {
defer pool.requestsMtx.Unlock() defer pool.requestsMtx.Unlock()
// we make one request per height. // we make one request per height.
return pool.height + uint(pool.numTotal) return pool.height + uint(len(pool.requests))
} }
func (pool *BlockPool) makeRequest(height uint) { func (pool *BlockPool) makeRequest(height uint) {
@ -302,12 +298,11 @@ func (pool *BlockPool) makeRequest(height uint) {
} }
pool.requests[height] = request pool.requests[height] = request
pool.peerless++ pool.numUnassigned++
nextHeight := pool.height + uint(pool.numTotal) nextHeight := pool.height + uint(len(pool.requests))
if nextHeight == height { if nextHeight == height {
pool.numTotal++ pool.numWaiting++
pool.numPending++
} }
go requestRoutine(pool, height) go requestRoutine(pool, height)
@ -332,7 +327,7 @@ func (pool *BlockPool) debug() string {
defer pool.requestsMtx.Unlock() defer pool.requestsMtx.Unlock()
str := "" str := ""
for h := pool.height; h < pool.height+uint(pool.numTotal); h++ { for h := pool.height; h < pool.height+uint(len(pool.requests)); h++ {
if pool.requests[h] == nil { if pool.requests[h] == nil {
str += Fmt("H(%v):X ", h) str += Fmt("H(%v):X ", h)
} else { } else {
@ -379,7 +374,7 @@ func requestRoutine(pool *BlockPool, height uint) {
break PICK_LOOP break PICK_LOOP
} }
// set the peer, decrement peerless // set the peer, decrement numUnassigned
pool.setPeerForRequest(height, peer.id) pool.setPeerForRequest(height, peer.id)
for try := 0; try < maxTries; try++ { for try := 0; try < maxTries; try++ {
@ -391,14 +386,14 @@ func requestRoutine(pool *BlockPool, height uint) {
return return
} }
// or already processed and we've moved past it // or already processed and we've moved past it
bpHeight, _, _ := pool.GetStatus() bpHeight, _ := pool.GetStatus()
if height < bpHeight { if height < bpHeight {
pool.decrPeer(peer.id) pool.decrPeer(peer.id)
return return
} }
} }
// unset the peer, increment peerless // unset the peer, increment numUnassigned
pool.removePeerForRequest(height, peer.id) pool.removePeerForRequest(height, peer.id)
// this peer failed us, try again // this peer failed us, try again

View File

@ -195,13 +195,14 @@ FOR_LOOP:
// ask for status updates // ask for status updates
go bcR.BroadcastStatusRequest() go bcR.BroadcastStatusRequest()
case _ = <-switchToConsensusTicker.C: case _ = <-switchToConsensusTicker.C:
// not thread safe access for peerless and numPending but should be fine // not thread safe access for numUnassigned and numWaiting but should be fine
log.Debug("Consensus ticker", "peerless", bcR.pool.peerless, "pending", bcR.pool.numPending, "total", bcR.pool.numTotal) // TODO make threadsafe and use exposed functions
log.Debug("Consensus ticker", "numUnassigned", bcR.pool.numUnassigned, "numWaiting", bcR.pool.numWaiting, "total", len(bcR.pool.requests))
// NOTE: this condition is very strict right now. may need to weaken // NOTE: this condition is very strict right now. may need to weaken
// if the max amount of requests are pending and peerless // if the max amount of requests are waiting and numUnassigned
// and we have some peers (say > 5), then we're caught up // and we have some peers (say > 5), then we're caught up
maxPending := bcR.pool.numPending == maxPendingRequests maxPending := bcR.pool.numWaiting == maxPendingRequests
maxPeerless := bcR.pool.peerless == bcR.pool.numPending maxPeerless := bcR.pool.numUnassigned == bcR.pool.numWaiting
o, i, _ := bcR.sw.NumPeers() o, i, _ := bcR.sw.NumPeers()
enoughPeers := o+i >= 5 enoughPeers := o+i >= 5
if maxPending && maxPeerless && enoughPeers { if maxPending && maxPeerless && enoughPeers {