// Package fetcher contains the block announcement based synchonisation. package fetcher import ( "errors" "math/rand" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" ) const ( arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested fetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block ) var ( errTerminated = errors.New("terminated") ) // hashCheckFn is a callback type for verifying a hash's presence in the local chain. type hashCheckFn func(common.Hash) bool // blockRequesterFn is a callback type for sending a block retrieval request. type blockRequesterFn func([]common.Hash) error // blockImporterFn is a callback type for trying to inject a block into the local chain. type blockImporterFn func(peer string, block *types.Block) error // announce is the hash notification of the availability of a new block in the // network. type announce struct { hash common.Hash // Hash of the block being announced time time.Time // Timestamp of the announcement origin string // Identifier of the peer originating the notification fetch blockRequesterFn // Fetcher function to retrieve } // Fetcher is responsible for accumulating block announcements from various peers // and scheduling them for retrieval. type Fetcher struct { // Various event channels notify chan *announce filter chan chan []*types.Block quit chan struct{} // Callbacks hasBlock hashCheckFn // Checks if a block is present in the chain importBlock blockImporterFn // Injects a block from an origin peer into the chain } // New creates a block fetcher to retrieve blocks based on hash announcements. func New(hasBlock hashCheckFn, importBlock blockImporterFn) *Fetcher { return &Fetcher{ notify: make(chan *announce), filter: make(chan chan []*types.Block), quit: make(chan struct{}), hasBlock: hasBlock, importBlock: importBlock, } } // Start boots up the announcement based synchoniser, accepting and processing // hash notifications and block fetches until termination requested. func (f *Fetcher) Start() { go f.loop() } // Stop terminates the announcement based synchroniser, canceling all pending // operations. func (f *Fetcher) Stop() { close(f.quit) } // Notify announces the fetcher of the potential availability of a new block in // the network. func (f *Fetcher) Notify(peer string, hash common.Hash, time time.Time, fetcher blockRequesterFn) error { block := &announce{ hash: hash, time: time, origin: peer, fetch: fetcher, } select { case f.notify <- block: return nil case <-f.quit: return errTerminated } } // Filter extracts all the blocks that were explicitly requested by the fetcher, // returning those that should be handled differently. func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks { // Send the filter channel to the fetcher filter := make(chan []*types.Block) select { case f.filter <- filter: case <-f.quit: return nil } // Request the filtering of the block list select { case filter <- blocks: case <-f.quit: return nil } // Retrieve the blocks remaining after filtering select { case blocks := <-filter: return blocks case <-f.quit: return nil } } // Loop is the main fetcher loop, checking and processing various notification // events. func (f *Fetcher) loop() { announced := make(map[common.Hash][]*announce) fetching := make(map[common.Hash]*announce) fetch := time.NewTimer(0) done := make(chan common.Hash) // Iterate the block fetching until a quit is requested for { // Clean up any expired block fetches for hash, announce := range fetching { if time.Since(announce.time) > fetchTimeout { delete(announced, hash) delete(fetching, hash) } } // Wait for an outside event to occur select { case <-f.quit: // Fetcher terminating, abort all operations return case notification := <-f.notify: // A block was announced, schedule if it's not yet downloading glog.V(logger.Debug).Infof("Peer %s: scheduling %x", notification.origin, notification.hash[:4]) if _, ok := fetching[notification.hash]; ok { break } if len(announced) == 0 { glog.V(logger.Detail).Infof("Scheduling fetch in %v, at %v", arriveTimeout-time.Since(notification.time), notification.time.Add(arriveTimeout)) fetch.Reset(arriveTimeout - time.Since(notification.time)) } announced[notification.hash] = append(announced[notification.hash], notification) case hash := <-done: // A pending import finished, remove all traces of the notification delete(announced, hash) delete(fetching, hash) case <-fetch.C: // At least one block's timer ran out, check for needing retrieval request := make(map[string][]common.Hash) for hash, announces := range announced { if time.Since(announces[0].time) > arriveTimeout { announce := announces[rand.Intn(len(announces))] if !f.hasBlock(hash) { request[announce.origin] = append(request[announce.origin], hash) fetching[hash] = announce } delete(announced, hash) } } // Send out all block requests for peer, hashes := range request { glog.V(logger.Debug).Infof("Peer %s: explicitly fetching %d blocks", peer, len(hashes)) go fetching[hashes[0]].fetch(hashes) } // Schedule the next fetch if blocks are still pending if len(announced) > 0 { nearest := time.Now() for _, announces := range announced { if nearest.After(announces[0].time) { nearest = announces[0].time } } glog.V(logger.Detail).Infof("Rescheduling fetch in %v, at %v", arriveTimeout-time.Since(nearest), nearest.Add(arriveTimeout)) fetch.Reset(arriveTimeout - time.Since(nearest)) } case filter := <-f.filter: // Blocks arrived, extract any explicit fetches, return all else var blocks types.Blocks select { case blocks = <-filter: case <-f.quit: return } explicit, download := []*types.Block{}, []*types.Block{} for _, block := range blocks { hash := block.Hash() // Filter explicitly requested blocks from hash announcements if _, ok := fetching[hash]; ok { // Discard if already imported by other means if !f.hasBlock(hash) { explicit = append(explicit, block) } else { delete(fetching, hash) } } else { download = append(download, block) } } select { case filter <- download: case <-f.quit: return } // Create a closure with the retrieved blocks and origin peers peers := make([]string, 0, len(explicit)) blocks = make([]*types.Block, 0, len(explicit)) for _, block := range explicit { hash := block.Hash() if announce := fetching[hash]; announce != nil { // Drop the block if it surely cannot fit if f.hasBlock(hash) || !f.hasBlock(block.ParentHash()) { // delete(fetching, hash) // if we drop, it will re-fetch it, wait for timeout? continue } // Otherwise accumulate for import peers = append(peers, announce.origin) blocks = append(blocks, block) } } // If any explicit fetches were replied to, import them if count := len(blocks); count > 0 { glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", len(blocks)) go func() { // Make sure all hashes are cleaned up for _, block := range blocks { hash := block.Hash() defer func() { done <- hash }() } // Try and actually import the blocks for i := 0; i < len(blocks); i++ { if err := f.importBlock(peers[i], blocks[i]); err != nil { glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) return } } }() } } } }