eth/downloader: fix expiration not running while fetching

This commit is contained in:
Péter Szilágyi 2015-05-07 12:59:19 +03:00
parent 4800c94392
commit 45f8304f3c
2 changed files with 32 additions and 34 deletions

View File

@ -346,13 +346,28 @@ out:
d.peers.setState(blockPack.peerId, idleState) d.peers.setState(blockPack.peerId, idleState)
} }
case <-ticker.C: case <-ticker.C:
// after removing bad peers make sure we actually have sufficient peer left to keep downloading // Check for bad peers. Bad peers may indicate a peer not responding
// to a `getBlocks` message. A timeout of 5 seconds is set. Peers
// that badly or poorly behave are removed from the peer set (not banned).
// Bad peers are excluded from the available peer set and therefor won't be
// reused. XXX We could re-introduce peers after X time.
badPeers := d.queue.Expire(blockTtl)
for _, pid := range badPeers {
// XXX We could make use of a reputation system here ranking peers
// in their performance
// 1) Time for them to respond;
// 2) Measure their speed;
// 3) Amount and availability.
if peer := d.peers[pid]; peer != nil {
peer.demote()
peer.reset()
}
}
// After removing bad peers make sure we actually have sufficient peer left to keep downloading
if len(d.peers) == 0 { if len(d.peers) == 0 {
d.queue.Reset() d.queue.Reset()
return errNoPeers return errNoPeers
} }
// If there are unrequested hashes left start fetching // If there are unrequested hashes left start fetching
// from the available peers. // from the available peers.
if d.queue.Pending() > 0 { if d.queue.Pending() > 0 {
@ -392,25 +407,6 @@ out:
// safely assume we're done. Another part of the process will check // safely assume we're done. Another part of the process will check
// for parent errors and will re-request anything that's missing // for parent errors and will re-request anything that's missing
break out break out
} else {
// Check for bad peers. Bad peers may indicate a peer not responding
// to a `getBlocks` message. A timeout of 5 seconds is set. Peers
// that badly or poorly behave are removed from the peer set (not banned).
// Bad peers are excluded from the available peer set and therefor won't be
// reused. XXX We could re-introduce peers after X time.
badPeers := d.queue.Expire(blockTtl)
for _, pid := range badPeers {
// XXX We could make use of a reputation system here ranking peers
// in their performance
// 1) Time for them to respond;
// 2) Measure their speed;
// 3) Amount and availability.
if peer := d.peers[pid]; peer != nil {
peer.demote()
peer.reset()
}
}
} }
} }
} }

View File

@ -12,7 +12,7 @@ import (
) )
const ( const (
blockCacheLimit = 4096 // Maximum number of blocks to cache before throttling the download blockCacheLimit = 1024 // Maximum number of blocks to cache before throttling the download
) )
// fetchRequest is a currently running block retrieval operation. // fetchRequest is a currently running block retrieval operation.
@ -28,8 +28,7 @@ type queue struct {
hashQueue *prque.Prque // Priority queue of the block hashes to fetch hashQueue *prque.Prque // Priority queue of the block hashes to fetch
hashCounter int // Counter indexing the added hashes to ensure retrieval order hashCounter int // Counter indexing the added hashes to ensure retrieval order
pendPool map[string]*fetchRequest // Currently pending block retrieval operations pendPool map[string]*fetchRequest // Currently pending block retrieval operations
pendCount int // Number of pending block fetches (to throttle the download)
blockPool map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes blockPool map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes
blockCache []*types.Block // Downloaded but not yet delivered blocks blockCache []*types.Block // Downloaded but not yet delivered blocks
@ -58,7 +57,6 @@ func (q *queue) Reset() {
q.hashCounter = 0 q.hashCounter = 0
q.pendPool = make(map[string]*fetchRequest) q.pendPool = make(map[string]*fetchRequest)
q.pendCount = 0
q.blockPool = make(map[common.Hash]int) q.blockPool = make(map[common.Hash]int)
q.blockOffset = 0 q.blockOffset = 0
@ -106,7 +104,13 @@ func (q *queue) Throttle() bool {
q.lock.RLock() q.lock.RLock()
defer q.lock.RUnlock() defer q.lock.RUnlock()
return q.pendCount >= len(q.blockCache)-len(q.blockPool) // Calculate the currently in-flight block requests
pending := 0
for _, request := range q.pendPool {
pending += len(request.Hashes)
}
// Throttle if more blocks are in-flight than free space in the cache
return pending >= len(q.blockCache)-len(q.blockPool)
} }
// Has checks if a hash is within the download queue or not. // Has checks if a hash is within the download queue or not.
@ -206,10 +210,14 @@ func (q *queue) Reserve(p *peer, max int) *fetchRequest {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
// Short circuit if the pool has been depleted // Short circuit if the pool has been depleted, or if the peer's already
// downloading something (sanity check not to corrupt state)
if q.hashQueue.Empty() { if q.hashQueue.Empty() {
return nil return nil
} }
if _, ok := q.pendPool[p.id]; ok {
return nil
}
// Retrieve a batch of hashes, skipping previously failed ones // Retrieve a batch of hashes, skipping previously failed ones
send := make(map[common.Hash]int) send := make(map[common.Hash]int)
skip := make(map[common.Hash]int) skip := make(map[common.Hash]int)
@ -236,7 +244,6 @@ func (q *queue) Reserve(p *peer, max int) *fetchRequest {
Time: time.Now(), Time: time.Now(),
} }
q.pendPool[p.id] = request q.pendPool[p.id] = request
q.pendCount += len(request.Hashes)
return request return request
} }
@ -250,7 +257,6 @@ func (q *queue) Cancel(request *fetchRequest) {
q.hashQueue.Push(hash, float32(index)) q.hashQueue.Push(hash, float32(index))
} }
delete(q.pendPool, request.Peer.id) delete(q.pendPool, request.Peer.id)
q.pendCount -= len(request.Hashes)
} }
// Expire checks for in flight requests that exceeded a timeout allowance, // Expire checks for in flight requests that exceeded a timeout allowance,
@ -266,7 +272,6 @@ func (q *queue) Expire(timeout time.Duration) []string {
for hash, index := range request.Hashes { for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index)) q.hashQueue.Push(hash, float32(index))
} }
q.pendCount -= len(request.Hashes)
peers = append(peers, id) peers = append(peers, id)
} }
} }
@ -289,9 +294,6 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
} }
delete(q.pendPool, id) delete(q.pendPool, id)
// Mark all the hashes in the request as non-pending
q.pendCount -= len(request.Hashes)
// If no blocks were retrieved, mark them as unavailable for the origin peer // If no blocks were retrieved, mark them as unavailable for the origin peer
if len(blocks) == 0 { if len(blocks) == 0 {
for hash, _ := range request.Hashes { for hash, _ := range request.Hashes {