mirror of https://github.com/poanetwork/quorum.git
added Raft BoundedFullSync functions which were overwritten as a part upstream merge
This commit is contained in:
parent
ee4034f49e
commit
b0896a48a6
|
@ -535,41 +535,21 @@ var (
|
|||
Value: whisper.DefaultMinimumPoW,
|
||||
}
|
||||
|
||||
// Metrics flags
|
||||
MetricsEnabledFlag = cli.BoolFlag{
|
||||
Name: metrics.MetricsEnabledFlag,
|
||||
Usage: "Enable metrics collection and reporting",
|
||||
}
|
||||
|
||||
// Raft flags
|
||||
RaftModeFlag = cli.BoolFlag{
|
||||
Name: "raft",
|
||||
Usage: "If enabled, uses Raft instead of Quorum Chain for consensus",
|
||||
}
|
||||
MetricsEnableInfluxDBFlag = cli.BoolFlag{
|
||||
Name: "metrics.influxdb",
|
||||
Usage: "Enable metrics export/push to an external InfluxDB database",
|
||||
}
|
||||
RaftBlockTimeFlag = cli.IntFlag{
|
||||
Name: "raftblocktime",
|
||||
Usage: "Amount of time between raft block creations in milliseconds",
|
||||
Value: 50,
|
||||
}
|
||||
MetricsInfluxDBEndpointFlag = cli.StringFlag{
|
||||
Name: "metrics.influxdb.endpoint",
|
||||
Usage: "InfluxDB API endpoint to report metrics to",
|
||||
Value: "http://localhost:8086",
|
||||
}
|
||||
RaftJoinExistingFlag = cli.IntFlag{
|
||||
Name: "raftjoinexisting",
|
||||
Usage: "The raft ID to assume when joining an pre-existing cluster",
|
||||
Value: 0,
|
||||
}
|
||||
MetricsInfluxDBDatabaseFlag = cli.StringFlag{
|
||||
Name: "metrics.influxdb.database",
|
||||
Usage: "InfluxDB database name to push reported metrics to",
|
||||
Value: "geth",
|
||||
}
|
||||
EmitCheckpointsFlag = cli.BoolFlag{
|
||||
Name: "emitcheckpoints",
|
||||
Usage: "If enabled, emit specially formatted logging checkpoints",
|
||||
|
@ -597,6 +577,26 @@ var (
|
|||
Usage: "Default minimum difference between two consecutive block's timestamps in seconds",
|
||||
Value: eth.DefaultConfig.Istanbul.BlockPeriod,
|
||||
}
|
||||
|
||||
// Metrics flags
|
||||
MetricsEnabledFlag = cli.BoolFlag{
|
||||
Name: metrics.MetricsEnabledFlag,
|
||||
Usage: "Enable metrics collection and reporting",
|
||||
}
|
||||
MetricsEnableInfluxDBFlag = cli.BoolFlag{
|
||||
Name: "metrics.influxdb",
|
||||
Usage: "Enable metrics export/push to an external InfluxDB database",
|
||||
}
|
||||
MetricsInfluxDBEndpointFlag = cli.StringFlag{
|
||||
Name: "metrics.influxdb.endpoint",
|
||||
Usage: "InfluxDB API endpoint to report metrics to",
|
||||
Value: "http://localhost:8086",
|
||||
}
|
||||
MetricsInfluxDBDatabaseFlag = cli.StringFlag{
|
||||
Name: "metrics.influxdb.database",
|
||||
Usage: "InfluxDB database name to push reported metrics to",
|
||||
Value: "geth",
|
||||
}
|
||||
MetricsInfluxDBUsernameFlag = cli.StringFlag{
|
||||
Name: "metrics.influxdb.username",
|
||||
Usage: "Username to authorize access to the database",
|
||||
|
|
|
@ -396,6 +396,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
|
|||
if p == nil {
|
||||
return errUnknownPeer
|
||||
}
|
||||
if d.mode == BoundedFullSync {
|
||||
err := d.syncWithPeerUntil(p, hash, td)
|
||||
if err == nil {
|
||||
d.processFullSyncContent()
|
||||
}
|
||||
return err
|
||||
}
|
||||
return d.syncWithPeer(p, hash, td)
|
||||
}
|
||||
|
||||
|
@ -1284,7 +1291,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
|
|||
}
|
||||
}
|
||||
// Unless we're doing light chains, schedule the headers for associated content retrieval
|
||||
if d.mode == FullSync || d.mode == FastSync {
|
||||
if d.mode == FullSync || d.mode == FastSync || d.mode == BoundedFullSync {
|
||||
// If we've reached the allowed number of pending headers, stall a bit
|
||||
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
|
||||
select {
|
||||
|
@ -1640,3 +1647,214 @@ func (d *Downloader) requestTTL() time.Duration {
|
|||
}
|
||||
return ttl
|
||||
}
|
||||
|
||||
// Extra downloader functionality for non-proof-of-work consensus
|
||||
|
||||
// Synchronizes with a peer, but only up to the provided Hash
|
||||
func (d *Downloader) syncWithPeerUntil(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
|
||||
d.mux.Post(StartEvent{})
|
||||
defer func() {
|
||||
// reset on error
|
||||
if err != nil {
|
||||
d.mux.Post(FailedEvent{err})
|
||||
} else {
|
||||
d.mux.Post(DoneEvent{})
|
||||
}
|
||||
}()
|
||||
if p.version < 62 {
|
||||
return errTooOld
|
||||
}
|
||||
|
||||
log.Info("Synchronising with the network", "id", p.id, "version", p.version)
|
||||
defer func(start time.Time) {
|
||||
log.Info("Synchronisation terminated", "duration", time.Since(start))
|
||||
}(time.Now())
|
||||
|
||||
remoteHeader, err := d.fetchHeader(p, hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
remoteHeight := remoteHeader.Number.Uint64()
|
||||
localHeight := d.blockchain.CurrentBlock().NumberU64()
|
||||
|
||||
d.syncStatsLock.Lock()
|
||||
if d.syncStatsChainHeight <= localHeight || d.syncStatsChainOrigin > localHeight {
|
||||
d.syncStatsChainOrigin = localHeight
|
||||
}
|
||||
d.syncStatsChainHeight = remoteHeight
|
||||
d.syncStatsLock.Unlock()
|
||||
|
||||
d.queue.Prepare(localHeight+1, d.mode)
|
||||
if d.syncInitHook != nil {
|
||||
d.syncInitHook(localHeight, remoteHeight)
|
||||
}
|
||||
|
||||
pivot := uint64(0)
|
||||
|
||||
fetchers := []func() error{
|
||||
func() error { return d.fetchBoundedHeaders(p, localHeight+1, remoteHeight) },
|
||||
func() error { return d.fetchBodies(localHeight + 1) },
|
||||
func() error { return d.fetchReceipts(localHeight + 1) }, // Receipts are only retrieved during fast sync
|
||||
func() error { return d.processHeaders(localHeight+1, pivot, td) },
|
||||
}
|
||||
return d.spawnSync(fetchers)
|
||||
}
|
||||
|
||||
// Fetches a single header from a peer
|
||||
func (d *Downloader) fetchHeader(p *peerConnection, hash common.Hash) (*types.Header, error) {
|
||||
log.Info("retrieving remote chain height", "peer", p)
|
||||
|
||||
go p.peer.RequestHeadersByHash(hash, 1, 0, false)
|
||||
|
||||
timeout := time.After(d.requestTTL())
|
||||
for {
|
||||
select {
|
||||
case <-d.cancelCh:
|
||||
return nil, errCancelBlockFetch
|
||||
|
||||
case packet := <-d.headerCh:
|
||||
// Discard anything not from the origin peer
|
||||
if packet.PeerId() != p.id {
|
||||
log.Info("Received headers from incorrect peer", "peer id", packet.PeerId())
|
||||
break
|
||||
}
|
||||
// Make sure the peer actually gave something valid
|
||||
headers := packet.(*headerPack).headers
|
||||
if len(headers) != 1 {
|
||||
log.Info("invalid number of head headers (!= 1)", "peer", p, "len(headers)", len(headers))
|
||||
return nil, errBadPeer
|
||||
}
|
||||
return headers[0], nil
|
||||
|
||||
case <-timeout:
|
||||
log.Info("head header timeout", "peer", p)
|
||||
return nil, errTimeout
|
||||
|
||||
case <-d.bodyCh:
|
||||
case <-d.stateCh:
|
||||
case <-d.receiptCh:
|
||||
// Out of bounds delivery, ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Not defined in go's stdlib:
|
||||
func minInt(a, b int) int {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// Fetches headers between `from` and `to`, inclusive.
|
||||
// Assumes invariant: from <= to.
|
||||
func (d *Downloader) fetchBoundedHeaders(p *peerConnection, from uint64, to uint64) error {
|
||||
log.Info("directing header downloads", "peer", p, "from", from, "to", to)
|
||||
defer log.Info("header download terminated", "peer", p)
|
||||
|
||||
// Create a timeout timer, and the associated header fetcher
|
||||
skeleton := true // Skeleton assembly phase or finishing up
|
||||
request := time.Now() // time of the last skeleton fetch request
|
||||
timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
|
||||
<-timeout.C // timeout channel should be initially empty
|
||||
defer timeout.Stop()
|
||||
|
||||
getHeaders := func(from uint64) {
|
||||
request = time.Now()
|
||||
timeout.Reset(d.requestTTL())
|
||||
|
||||
skeletonStart := from + uint64(MaxHeaderFetch) - 1
|
||||
|
||||
if skeleton {
|
||||
if skeletonStart > to {
|
||||
skeleton = false
|
||||
}
|
||||
}
|
||||
|
||||
if skeleton {
|
||||
numSkeletonHeaders := minInt(MaxSkeletonSize, (int(to-from)+1)/MaxHeaderFetch)
|
||||
log.Trace("fetching skeleton headers", "peer", p, "num skeleton headers", numSkeletonHeaders, "from", from)
|
||||
go p.peer.RequestHeadersByNumber(skeletonStart, numSkeletonHeaders, MaxHeaderFetch-1, false)
|
||||
} else {
|
||||
// There are not enough headers remaining to warrant a skeleton fetch.
|
||||
// Grab all of the remaining headers.
|
||||
|
||||
numHeaders := int(to-from) + 1
|
||||
log.Trace("fetching full headers", "peer", p, "num headers", numHeaders, "from", from)
|
||||
go p.peer.RequestHeadersByNumber(from, numHeaders, 0, false)
|
||||
}
|
||||
}
|
||||
// Start pulling the header chain skeleton until all is done
|
||||
getHeaders(from)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-d.cancelCh:
|
||||
return errCancelHeaderFetch
|
||||
|
||||
case packet := <-d.headerCh:
|
||||
// Make sure the active peer is giving us the skeleton headers
|
||||
if packet.PeerId() != p.id {
|
||||
log.Info("Received headers from incorrect peer", "peer id", packet.PeerId())
|
||||
break
|
||||
}
|
||||
headerReqTimer.UpdateSince(request)
|
||||
timeout.Stop()
|
||||
|
||||
headers := packet.(*headerPack).headers
|
||||
|
||||
// If we received a skeleton batch, resolve internals concurrently
|
||||
if skeleton {
|
||||
filled, proced, err := d.fillHeaderSkeleton(from, headers)
|
||||
if err != nil {
|
||||
log.Debug("skeleton chain invalid", "peer", p, "err", err)
|
||||
return errInvalidChain
|
||||
}
|
||||
headers = filled[proced:]
|
||||
from += uint64(proced)
|
||||
}
|
||||
// Insert all the new headers and fetch the next batch
|
||||
if len(headers) > 0 {
|
||||
log.Trace("schedule headers", "peer", p, "num headers", len(headers), "from", from)
|
||||
select {
|
||||
case d.headerProcCh <- headers:
|
||||
case <-d.cancelCh:
|
||||
return errCancelHeaderFetch
|
||||
}
|
||||
from += uint64(len(headers))
|
||||
}
|
||||
|
||||
if from <= to {
|
||||
getHeaders(from)
|
||||
} else {
|
||||
// Notify the content fetchers that no more headers are inbound and return.
|
||||
select {
|
||||
case d.headerProcCh <- nil:
|
||||
return nil
|
||||
case <-d.cancelCh:
|
||||
return errCancelHeaderFetch
|
||||
}
|
||||
}
|
||||
|
||||
case <-timeout.C:
|
||||
// Header retrieval timed out, consider the peer bad and drop
|
||||
log.Info("header request timed out", "peer", p)
|
||||
headerTimeoutMeter.Mark(1)
|
||||
d.dropPeer(p.id)
|
||||
|
||||
// Finish the sync gracefully instead of dumping the gathered data though
|
||||
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
|
||||
select {
|
||||
case ch <- false:
|
||||
case <-d.cancelCh:
|
||||
}
|
||||
}
|
||||
select {
|
||||
case d.headerProcCh <- nil:
|
||||
case <-d.cancelCh:
|
||||
}
|
||||
return errBadPeer
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -153,8 +153,10 @@ func (pm *ProtocolManager) syncer() {
|
|||
}
|
||||
|
||||
case <-forceSync.C:
|
||||
// Force a sync even if not enough peers are present
|
||||
go pm.synchronise(pm.peers.BestPeer())
|
||||
if !pm.raftMode {
|
||||
// Force a sync even if not enough peers are present
|
||||
go pm.synchronise(pm.peers.BestPeer())
|
||||
}
|
||||
|
||||
case <-pm.noMorePeers:
|
||||
return
|
||||
|
|
Loading…
Reference in New Issue