added Raft BoundedFullSync functions which were overwritten as a part upstream merge

This commit is contained in:
vsmk98 2018-08-30 06:59:45 +00:00 committed by amalrajmani
parent 12b6c5376f
commit 912809810a
3 changed files with 243 additions and 23 deletions

View File

@ -535,41 +535,21 @@ var (
Value: whisper.DefaultMinimumPoW,
}
// Metrics flags
MetricsEnabledFlag = cli.BoolFlag{
Name: metrics.MetricsEnabledFlag,
Usage: "Enable metrics collection and reporting",
}
// Raft flags
RaftModeFlag = cli.BoolFlag{
Name: "raft",
Usage: "If enabled, uses Raft instead of Quorum Chain for consensus",
}
MetricsEnableInfluxDBFlag = cli.BoolFlag{
Name: "metrics.influxdb",
Usage: "Enable metrics export/push to an external InfluxDB database",
}
RaftBlockTimeFlag = cli.IntFlag{
Name: "raftblocktime",
Usage: "Amount of time between raft block creations in milliseconds",
Value: 50,
}
MetricsInfluxDBEndpointFlag = cli.StringFlag{
Name: "metrics.influxdb.endpoint",
Usage: "InfluxDB API endpoint to report metrics to",
Value: "http://localhost:8086",
}
RaftJoinExistingFlag = cli.IntFlag{
Name: "raftjoinexisting",
Usage: "The raft ID to assume when joining an pre-existing cluster",
Value: 0,
}
MetricsInfluxDBDatabaseFlag = cli.StringFlag{
Name: "metrics.influxdb.database",
Usage: "InfluxDB database name to push reported metrics to",
Value: "geth",
}
EmitCheckpointsFlag = cli.BoolFlag{
Name: "emitcheckpoints",
Usage: "If enabled, emit specially formatted logging checkpoints",
@ -597,6 +577,26 @@ var (
Usage: "Default minimum difference between two consecutive block's timestamps in seconds",
Value: eth.DefaultConfig.Istanbul.BlockPeriod,
}
// Metrics flags
MetricsEnabledFlag = cli.BoolFlag{
Name: metrics.MetricsEnabledFlag,
Usage: "Enable metrics collection and reporting",
}
MetricsEnableInfluxDBFlag = cli.BoolFlag{
Name: "metrics.influxdb",
Usage: "Enable metrics export/push to an external InfluxDB database",
}
MetricsInfluxDBEndpointFlag = cli.StringFlag{
Name: "metrics.influxdb.endpoint",
Usage: "InfluxDB API endpoint to report metrics to",
Value: "http://localhost:8086",
}
MetricsInfluxDBDatabaseFlag = cli.StringFlag{
Name: "metrics.influxdb.database",
Usage: "InfluxDB database name to push reported metrics to",
Value: "geth",
}
MetricsInfluxDBUsernameFlag = cli.StringFlag{
Name: "metrics.influxdb.username",
Usage: "Username to authorize access to the database",

View File

@ -396,6 +396,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if p == nil {
return errUnknownPeer
}
if d.mode == BoundedFullSync {
err := d.syncWithPeerUntil(p, hash, td)
if err == nil {
d.processFullSyncContent()
}
return err
}
return d.syncWithPeer(p, hash, td)
}
@ -1284,7 +1291,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d.mode == FullSync || d.mode == FastSync {
if d.mode == FullSync || d.mode == FastSync || d.mode == BoundedFullSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
select {
@ -1640,3 +1647,214 @@ func (d *Downloader) requestTTL() time.Duration {
}
return ttl
}
// Extra downloader functionality for non-proof-of-work consensus
// Synchronizes with a peer, but only up to the provided Hash
func (d *Downloader) syncWithPeerUntil(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
d.mux.Post(StartEvent{})
defer func() {
// reset on error
if err != nil {
d.mux.Post(FailedEvent{err})
} else {
d.mux.Post(DoneEvent{})
}
}()
if p.version < 62 {
return errTooOld
}
log.Info("Synchronising with the network", "id", p.id, "version", p.version)
defer func(start time.Time) {
log.Info("Synchronisation terminated", "duration", time.Since(start))
}(time.Now())
remoteHeader, err := d.fetchHeader(p, hash)
if err != nil {
return err
}
remoteHeight := remoteHeader.Number.Uint64()
localHeight := d.blockchain.CurrentBlock().NumberU64()
d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= localHeight || d.syncStatsChainOrigin > localHeight {
d.syncStatsChainOrigin = localHeight
}
d.syncStatsChainHeight = remoteHeight
d.syncStatsLock.Unlock()
d.queue.Prepare(localHeight+1, d.mode)
if d.syncInitHook != nil {
d.syncInitHook(localHeight, remoteHeight)
}
pivot := uint64(0)
fetchers := []func() error{
func() error { return d.fetchBoundedHeaders(p, localHeight+1, remoteHeight) },
func() error { return d.fetchBodies(localHeight + 1) },
func() error { return d.fetchReceipts(localHeight + 1) }, // Receipts are only retrieved during fast sync
func() error { return d.processHeaders(localHeight+1, pivot, td) },
}
return d.spawnSync(fetchers)
}
// Fetches a single header from a peer
func (d *Downloader) fetchHeader(p *peerConnection, hash common.Hash) (*types.Header, error) {
log.Info("retrieving remote chain height", "peer", p)
go p.peer.RequestHeadersByHash(hash, 1, 0, false)
timeout := time.After(d.requestTTL())
for {
select {
case <-d.cancelCh:
return nil, errCancelBlockFetch
case packet := <-d.headerCh:
// Discard anything not from the origin peer
if packet.PeerId() != p.id {
log.Info("Received headers from incorrect peer", "peer id", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
headers := packet.(*headerPack).headers
if len(headers) != 1 {
log.Info("invalid number of head headers (!= 1)", "peer", p, "len(headers)", len(headers))
return nil, errBadPeer
}
return headers[0], nil
case <-timeout:
log.Info("head header timeout", "peer", p)
return nil, errTimeout
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
}
}
// Not defined in go's stdlib:
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
// Fetches headers between `from` and `to`, inclusive.
// Assumes invariant: from <= to.
func (d *Downloader) fetchBoundedHeaders(p *peerConnection, from uint64, to uint64) error {
log.Info("directing header downloads", "peer", p, "from", from, "to", to)
defer log.Info("header download terminated", "peer", p)
// Create a timeout timer, and the associated header fetcher
skeleton := true // Skeleton assembly phase or finishing up
request := time.Now() // time of the last skeleton fetch request
timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
<-timeout.C // timeout channel should be initially empty
defer timeout.Stop()
getHeaders := func(from uint64) {
request = time.Now()
timeout.Reset(d.requestTTL())
skeletonStart := from + uint64(MaxHeaderFetch) - 1
if skeleton {
if skeletonStart > to {
skeleton = false
}
}
if skeleton {
numSkeletonHeaders := minInt(MaxSkeletonSize, (int(to-from)+1)/MaxHeaderFetch)
log.Trace("fetching skeleton headers", "peer", p, "num skeleton headers", numSkeletonHeaders, "from", from)
go p.peer.RequestHeadersByNumber(skeletonStart, numSkeletonHeaders, MaxHeaderFetch-1, false)
} else {
// There are not enough headers remaining to warrant a skeleton fetch.
// Grab all of the remaining headers.
numHeaders := int(to-from) + 1
log.Trace("fetching full headers", "peer", p, "num headers", numHeaders, "from", from)
go p.peer.RequestHeadersByNumber(from, numHeaders, 0, false)
}
}
// Start pulling the header chain skeleton until all is done
getHeaders(from)
for {
select {
case <-d.cancelCh:
return errCancelHeaderFetch
case packet := <-d.headerCh:
// Make sure the active peer is giving us the skeleton headers
if packet.PeerId() != p.id {
log.Info("Received headers from incorrect peer", "peer id", packet.PeerId())
break
}
headerReqTimer.UpdateSince(request)
timeout.Stop()
headers := packet.(*headerPack).headers
// If we received a skeleton batch, resolve internals concurrently
if skeleton {
filled, proced, err := d.fillHeaderSkeleton(from, headers)
if err != nil {
log.Debug("skeleton chain invalid", "peer", p, "err", err)
return errInvalidChain
}
headers = filled[proced:]
from += uint64(proced)
}
// Insert all the new headers and fetch the next batch
if len(headers) > 0 {
log.Trace("schedule headers", "peer", p, "num headers", len(headers), "from", from)
select {
case d.headerProcCh <- headers:
case <-d.cancelCh:
return errCancelHeaderFetch
}
from += uint64(len(headers))
}
if from <= to {
getHeaders(from)
} else {
// Notify the content fetchers that no more headers are inbound and return.
select {
case d.headerProcCh <- nil:
return nil
case <-d.cancelCh:
return errCancelHeaderFetch
}
}
case <-timeout.C:
// Header retrieval timed out, consider the peer bad and drop
log.Info("header request timed out", "peer", p)
headerTimeoutMeter.Mark(1)
d.dropPeer(p.id)
// Finish the sync gracefully instead of dumping the gathered data though
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
}
}
select {
case d.headerProcCh <- nil:
case <-d.cancelCh:
}
return errBadPeer
}
}
}

View File

@ -153,8 +153,10 @@ func (pm *ProtocolManager) syncer() {
}
case <-forceSync.C:
// Force a sync even if not enough peers are present
go pm.synchronise(pm.peers.BestPeer())
if !pm.raftMode {
// Force a sync even if not enough peers are present
go pm.synchronise(pm.peers.BestPeer())
}
case <-pm.noMorePeers:
return