quorum/les/fetcher.go

296 lines
7.4 KiB
Go

// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package les implements the Light Ethereum Subprotocol.
package les
import (
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
)
type lightFetcher struct {
pm *ProtocolManager
odr *LesOdr
chain BlockChain
headAnnouncedMu sync.Mutex
headAnnouncedBy map[common.Hash][]*peer
currentTd *big.Int
deliverChn chan fetchResponse
reqMu sync.RWMutex
requested map[uint64]fetchRequest
timeoutChn chan uint64
notifyChn chan bool // true if initiated from outside
syncing bool
syncDone chan struct{}
}
type fetchRequest struct {
hash common.Hash
amount uint64
peer *peer
}
type fetchResponse struct {
reqID uint64
headers []*types.Header
}
func newLightFetcher(pm *ProtocolManager) *lightFetcher {
f := &lightFetcher{
pm: pm,
chain: pm.blockchain,
odr: pm.odr,
headAnnouncedBy: make(map[common.Hash][]*peer),
deliverChn: make(chan fetchResponse, 100),
requested: make(map[uint64]fetchRequest),
timeoutChn: make(chan uint64),
notifyChn: make(chan bool, 100),
syncDone: make(chan struct{}),
currentTd: big.NewInt(0),
}
go f.syncLoop()
return f
}
func (f *lightFetcher) notify(p *peer, head *announceData) {
var headHash common.Hash
if head == nil {
// initial notify
headHash = p.Head()
} else {
if core.GetTd(f.pm.chainDb, head.Hash, head.Number) != nil {
head.haveHeaders = head.Number
}
//fmt.Println("notify", p.id, head.Number, head.ReorgDepth, head.haveHeaders)
if !p.addNotify(head) {
//fmt.Println("addNotify fail")
f.pm.removePeer(p.id)
}
headHash = head.Hash
}
f.headAnnouncedMu.Lock()
f.headAnnouncedBy[headHash] = append(f.headAnnouncedBy[headHash], p)
f.headAnnouncedMu.Unlock()
f.notifyChn <- true
}
func (f *lightFetcher) gotHeader(header *types.Header) {
f.headAnnouncedMu.Lock()
defer f.headAnnouncedMu.Unlock()
hash := header.Hash()
peerList := f.headAnnouncedBy[hash]
if peerList == nil {
return
}
number := header.Number.Uint64()
td := core.GetTd(f.pm.chainDb, hash, number)
for _, peer := range peerList {
peer.lock.Lock()
ok := peer.gotHeader(hash, number, td)
peer.lock.Unlock()
if !ok {
//fmt.Println("gotHeader fail")
f.pm.removePeer(peer.id)
}
}
delete(f.headAnnouncedBy, hash)
}
func (f *lightFetcher) nextRequest() (*peer, *announceData) {
var bestPeer *peer
bestTd := f.currentTd
for _, peer := range f.pm.peers.AllPeers() {
peer.lock.RLock()
if !peer.headInfo.requested && (peer.headInfo.Td.Cmp(bestTd) > 0 ||
(bestPeer != nil && peer.headInfo.Td.Cmp(bestTd) == 0 && peer.headInfo.haveHeaders > bestPeer.headInfo.haveHeaders)) {
bestPeer = peer
bestTd = peer.headInfo.Td
}
peer.lock.RUnlock()
}
if bestPeer == nil {
return nil, nil
}
bestPeer.lock.Lock()
res := bestPeer.headInfo
res.requested = true
bestPeer.lock.Unlock()
for _, peer := range f.pm.peers.AllPeers() {
if peer != bestPeer {
peer.lock.Lock()
if peer.headInfo.Hash == bestPeer.headInfo.Hash && peer.headInfo.haveHeaders == bestPeer.headInfo.haveHeaders {
peer.headInfo.requested = true
}
peer.lock.Unlock()
}
}
return bestPeer, res
}
func (f *lightFetcher) deliverHeaders(reqID uint64, headers []*types.Header) {
f.deliverChn <- fetchResponse{reqID: reqID, headers: headers}
}
func (f *lightFetcher) requestedID(reqID uint64) bool {
f.reqMu.RLock()
_, ok := f.requested[reqID]
f.reqMu.RUnlock()
return ok
}
func (f *lightFetcher) request(p *peer, block *announceData) {
//fmt.Println("request", p.id, block.Number, block.haveHeaders)
amount := block.Number - block.haveHeaders
if amount == 0 {
return
}
if amount > 100 {
f.syncing = true
go func() {
//fmt.Println("f.pm.synchronise(p)")
f.pm.synchronise(p)
//fmt.Println("sync done")
f.syncDone <- struct{}{}
}()
return
}
reqID := f.odr.getNextReqID()
f.reqMu.Lock()
f.requested[reqID] = fetchRequest{hash: block.Hash, amount: amount, peer: p}
f.reqMu.Unlock()
cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount))
p.fcServer.SendRequest(reqID, cost)
go p.RequestHeadersByHash(reqID, cost, block.Hash, int(amount), 0, true)
go func() {
time.Sleep(hardRequestTimeout)
f.timeoutChn <- reqID
}()
}
func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool {
if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash {
return false
}
headers := make([]*types.Header, req.amount)
for i, header := range resp.headers {
headers[int(req.amount)-1-i] = header
}
if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil {
return false
}
for _, header := range headers {
td := core.GetTd(f.pm.chainDb, header.Hash(), header.Number.Uint64())
if td == nil {
return false
}
if td.Cmp(f.currentTd) > 0 {
f.currentTd = td
}
f.gotHeader(header)
}
return true
}
func (f *lightFetcher) checkSyncedHeaders() {
//fmt.Println("checkSyncedHeaders()")
for _, peer := range f.pm.peers.AllPeers() {
peer.lock.Lock()
h := peer.firstHeadInfo
remove := false
loop:
for h != nil {
if td := core.GetTd(f.pm.chainDb, h.Hash, h.Number); td != nil {
//fmt.Println(" found", h.Number)
ok := peer.gotHeader(h.Hash, h.Number, td)
if !ok {
remove = true
break loop
}
if td.Cmp(f.currentTd) > 0 {
f.currentTd = td
}
}
h = h.next
}
peer.lock.Unlock()
if remove {
//fmt.Println("checkSync fail")
f.pm.removePeer(peer.id)
}
}
}
func (f *lightFetcher) syncLoop() {
f.pm.wg.Add(1)
defer f.pm.wg.Done()
srtoNotify := false
for {
select {
case <-f.pm.quitSync:
return
case ext := <-f.notifyChn:
//fmt.Println("<-f.notifyChn", f.syncing, ext, srtoNotify)
s := srtoNotify
srtoNotify = false
if !f.syncing && !(ext && s) {
if p, r := f.nextRequest(); r != nil {
srtoNotify = true
go func() {
time.Sleep(softRequestTimeout)
f.notifyChn <- false
}()
f.request(p, r)
}
}
case reqID := <-f.timeoutChn:
f.reqMu.Lock()
req, ok := f.requested[reqID]
if ok {
delete(f.requested, reqID)
}
f.reqMu.Unlock()
if ok {
//fmt.Println("hard timeout")
f.pm.removePeer(req.peer.id)
}
case resp := <-f.deliverChn:
//fmt.Println("<-f.deliverChn", f.syncing)
f.reqMu.Lock()
req, ok := f.requested[resp.reqID]
delete(f.requested, resp.reqID)
f.reqMu.Unlock()
if !ok || !(f.syncing || f.processResponse(req, resp)) {
//fmt.Println("processResponse fail")
f.pm.removePeer(req.peer.id)
}
case <-f.syncDone:
//fmt.Println("<-f.syncDone", f.syncing)
f.checkSyncedHeaders()
f.syncing = false
}
}
}