eth/downloader: always send termination wakes, clean leftover

This commit is contained in:
Péter Szilágyi 2015-09-23 12:39:17 +03:00
parent e456f27795
commit f459a3f0ae
1 changed files with 36 additions and 20 deletions

View File

@ -154,7 +154,7 @@ type Downloader struct {
blockCh chan blockPack // [eth/61] Channel receiving inbound blocks blockCh chan blockPack // [eth/61] Channel receiving inbound blocks
headerCh chan headerPack // [eth/62] Channel receiving inbound block headers headerCh chan headerPack // [eth/62] Channel receiving inbound block headers
bodyCh chan bodyPack // [eth/62] Channel receiving inbound block bodies bodyCh chan bodyPack // [eth/62] Channel receiving inbound block bodies
processCh chan bool // Channel to signal the block fetcher of new or finished work wakeCh chan bool // Channel to signal the block/body fetcher of new tasks
cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
@ -188,7 +188,7 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, he
blockCh: make(chan blockPack, 1), blockCh: make(chan blockPack, 1),
headerCh: make(chan headerPack, 1), headerCh: make(chan headerPack, 1),
bodyCh: make(chan bodyPack, 1), bodyCh: make(chan bodyPack, 1),
processCh: make(chan bool, 1), wakeCh: make(chan bool, 1),
} }
} }
@ -282,6 +282,10 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error
d.queue.Reset() d.queue.Reset()
d.peers.Reset() d.peers.Reset()
select {
case <-d.wakeCh:
default:
}
// Create cancel channel for aborting mid-flight // Create cancel channel for aborting mid-flight
d.cancelLock.Lock() d.cancelLock.Lock()
d.cancelCh = make(chan struct{}) d.cancelCh = make(chan struct{})
@ -633,7 +637,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
glog.V(logger.Debug).Infof("%v: no available hashes", p) glog.V(logger.Debug).Infof("%v: no available hashes", p)
select { select {
case d.processCh <- false: case d.wakeCh <- false:
case <-d.cancelCh: case <-d.cancelCh:
} }
// If no hashes were retrieved at all, the peer violated it's TD promise that it had a // If no hashes were retrieved at all, the peer violated it's TD promise that it had a
@ -664,12 +668,18 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
return errBadPeer return errBadPeer
} }
// Notify the block fetcher of new hashes, but stop if queue is full // Notify the block fetcher of new hashes, but stop if queue is full
cont := d.queue.Pending() < maxQueuedHashes if d.queue.Pending() < maxQueuedHashes {
select { // We still have hashes to fetch, send continuation wake signal (potential)
case d.processCh <- cont: select {
default: case d.wakeCh <- true:
} default:
if !cont { }
} else {
// Hash limit reached, send a termination wake signal (enforced)
select {
case d.wakeCh <- false:
case <-d.cancelCh:
}
return nil return nil
} }
// Queue not yet full, fetch the next batch // Queue not yet full, fetch the next batch
@ -766,7 +776,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
default: default:
} }
case cont := <-d.processCh: case cont := <-d.wakeCh:
// The hash fetcher sent a continuation flag, check if it's done // The hash fetcher sent a continuation flag, check if it's done
if !cont { if !cont {
finished = true finished = true
@ -1053,7 +1063,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
glog.V(logger.Debug).Infof("%v: no available headers", p) glog.V(logger.Debug).Infof("%v: no available headers", p)
select { select {
case d.processCh <- false: case d.wakeCh <- false:
case <-d.cancelCh: case <-d.cancelCh:
} }
// If no headers were retrieved at all, the peer violated it's TD promise that it had a // If no headers were retrieved at all, the peer violated it's TD promise that it had a
@ -1084,12 +1094,18 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
return errBadPeer return errBadPeer
} }
// Notify the block fetcher of new headers, but stop if queue is full // Notify the block fetcher of new headers, but stop if queue is full
cont := d.queue.Pending() < maxQueuedHeaders if d.queue.Pending() < maxQueuedHeaders {
select { // We still have headers to fetch, send continuation wake signal (potential)
case d.processCh <- cont: select {
default: case d.wakeCh <- true:
} default:
if !cont { }
} else {
// Header limit reached, send a termination wake signal (enforced)
select {
case d.wakeCh <- false:
case <-d.cancelCh:
}
return nil return nil
} }
// Queue not yet full, fetch the next batch // Queue not yet full, fetch the next batch
@ -1104,8 +1120,8 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
// Finish the sync gracefully instead of dumping the gathered data though // Finish the sync gracefully instead of dumping the gathered data though
select { select {
case d.processCh <- false: case d.wakeCh <- false:
default: case <-d.cancelCh:
} }
return nil return nil
} }
@ -1199,7 +1215,7 @@ func (d *Downloader) fetchBodies(from uint64) error {
default: default:
} }
case cont := <-d.processCh: case cont := <-d.wakeCh:
// The header fetcher sent a continuation flag, check if it's done // The header fetcher sent a continuation flag, check if it's done
if !cont { if !cont {
finished = true finished = true