This commit is contained in:
StephenButtolph 2020-04-30 11:57:43 -04:00
parent a260eb841f
commit 59e83e6159
5 changed files with 160 additions and 29 deletions

View File

@ -16,7 +16,7 @@ type snowmanBlock struct {
// block that this node contains. For the genesis, this value will be nil
blk Block
// shouldFalter is set to true if this node, and all its decendants received
// shouldFalter is set to true if this node, and all its descendants received
// less than Alpha votes
shouldFalter bool

View File

@ -149,7 +149,7 @@ func (ts *Topological) Preference() ids.ID { return ts.tail }
// During the sort, votes are pushed towards the genesis. To prevent interating
// over all blocks that had unsuccessful polls, we set a flag on the block to
// know that any future traversal through that block should register an
// unsuccessful poll on that block and every decendant block.
// unsuccessful poll on that block and every descendant block.
//
// The complexity of this function is:
// - Runtime = 3 * |live set| + |votes|
@ -408,7 +408,7 @@ func (ts *Topological) getPreferredDecendent(blkID ids.ID) ids.ID {
// accept the preferred child of the provided snowman block. By accepting the
// preferred child, all other children will be rejected. When these children are
// rejected, all their decendants will be rejected.
// rejected, all their descendants will be rejected.
func (ts *Topological) accept(n *snowmanBlock) {
// We are finalizing the block's child, so we need to get the preference
pref := n.sb.Preference()
@ -451,11 +451,11 @@ func (ts *Topological) accept(n *snowmanBlock) {
rejects = append(rejects, childID)
}
// reject all the decendants of the blocks we just rejected
// reject all the descendants of the blocks we just rejected
ts.rejectTransitively(rejects)
}
// Takes in a list of rejected ids and rejects all decendants of these IDs
// Takes in a list of rejected ids and rejects all descendants of these IDs
func (ts *Topological) rejectTransitively(rejected []ids.ID) {
// the rejected array is treated as a queue, with the next element at index
// 0 and the last element at the end of the slice.

View File

@ -37,7 +37,7 @@ type FrontierHandler interface {
// Notify this engine of a request for the accepted frontier of vertices.
//
// The accepted frontier is the set of accepted vertices that do not have
// any accepted decendants.
// any accepted descendants.
//
// This function can be called by any validator. It is not safe to assume
// this message is utilizing a unique requestID. However, the validatorID is
@ -149,9 +149,9 @@ type FetchHandler interface {
// anticipated to be responded to. This could be because the recipient of
// the message is unknown or if the message request has timed out.
//
// The validatorID, requestID, and containerID are assumed to be the same as
// those sent in the Get message.
GetFailed(validatorID ids.ShortID, requestID uint32, containerID ids.ID)
// The validatorID and requestID are assumed to be the same as those sent in
// the Get message.
GetFailed(validatorID ids.ShortID, requestID uint32)
}
// QueryHandler defines how a consensus engine reacts to query messages from

View File

@ -0,0 +1,60 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package snowman
import (
"github.com/ava-labs/gecko/ids"
)
// Requests ...
type Requests struct {
reqs map[[20]byte]map[uint32]ids.ID
requested ids.Set
}
// Add ...
func (r *Requests) Add(vdr ids.ShortID, requestID uint32, containerID ids.ID) {
if r.reqs == nil {
r.reqs = make(map[[20]byte]map[uint32]ids.ID)
}
vdrKey := vdr.Key()
vdrReqs, ok := r.reqs[vdrKey]
if !ok {
vdrReqs = make(map[uint32]ids.ID)
r.reqs[vdrKey] = vdrReqs
}
vdrReqs[requestID] = containerID
r.requested.Add(containerID)
}
// Remove ...
func (r *Requests) Remove(vdr ids.ShortID, requestID uint32) (ids.ID, bool) {
if r.reqs == nil {
return ids.ID{}, false
}
vdrKey := vdr.Key()
vdrReqs, ok := r.reqs[vdrKey]
if !ok {
return ids.ID{}, false
}
containerID, ok := vdrReqs[requestID]
if !ok {
return ids.ID{}, false
}
if len(vdrReqs) == 1 {
delete(r.reqs, vdrKey)
} else {
delete(vdrReqs, requestID)
}
r.requested.Remove(containerID)
return containerID, true
}
// Len ...
func (r *Requests) Len() int { return r.requested.Len() }
// Contains ...
func (r *Requests) Contains(containerID ids.ID) bool { return r.requested.Contains(containerID) }

View File

@ -19,12 +19,22 @@ type Transitive struct {
Config
bootstrapper
polls polls // track people I have asked for their preference
// track outstanding preference requests
polls polls
blkReqs, pending ids.Set // prevent asking validators for the same block
// blocks that have outstaning get requests
blkReqs Requests
blocked events.Blocker // track operations that are blocked on blocks
// blocks that are fetched but haven't been issued due to missing
// dependencies
pending ids.Set
// operations that are blocked on a block being issued. This could be
// issuing another block, responding to a query, or applying votes to
// consensus
blocked events.Blocker
// mark for if the engine has been bootstrapped or not
bootstrapped bool
}
@ -33,7 +43,11 @@ func (t *Transitive) Initialize(config Config) {
config.Context.Log.Info("Initializing Snowman consensus")
t.Config = config
t.metrics.Initialize(config.Context.Log, config.Params.Namespace, config.Params.Metrics)
t.metrics.Initialize(
config.Context.Log,
config.Params.Namespace,
config.Params.Metrics,
)
t.onFinished = t.finishBootstrapping
t.bootstrapper.Initialize(config.BootstrapConfig)
@ -44,11 +58,19 @@ func (t *Transitive) Initialize(config Config) {
t.polls.m = make(map[uint32]poll)
}
// when bootstrapping is finished, this will be called. This initializes the
// consensus engine with the last accepted block.
func (t *Transitive) finishBootstrapping() {
// set the bootstrapped mark to switch consensus modes
t.bootstrapped = true
// initialize consensus to the last accepted blockID
tailID := t.Config.VM.LastAccepted()
t.Consensus.Initialize(t.Config.Context, t.Params, tailID)
// to maintain the invariant that oracle blocks are issued in the correct
// preferences, we need to handle the case that we are bootstrapping into an
// oracle block
tail, err := t.Config.VM.GetBlock(tailID)
if err != nil {
t.Config.Context.Log.Error("Failed to get last accepted block due to: %s", err)
@ -58,9 +80,12 @@ func (t *Transitive) finishBootstrapping() {
switch blk := tail.(type) {
case OracleBlock:
for _, blk := range blk.Options() {
// note that deliver will set the VM's preference
t.deliver(blk)
}
default:
// if there aren't blocks we need to deliver on startup, we need to set
// the preference to the last accepted block
t.Config.VM.SetPreference(tailID)
}
}
@ -76,15 +101,27 @@ func (t *Transitive) Context() *snow.Context { return t.Config.Context }
// Get implements the Engine interface
func (t *Transitive) Get(vdr ids.ShortID, requestID uint32, blkID ids.ID) {
if blk, err := t.Config.VM.GetBlock(blkID); err == nil {
t.Config.Sender.Put(vdr, requestID, blkID, blk.Bytes())
blk, err := t.Config.VM.GetBlock(blkID)
if err != nil {
// If we failed to get the block, that means either an unexpected error
// has occurred, the validator is not following the protocol, or the
// block has been pruned.
t.Config.Context.Log.Warn("Get called for blockID %s errored with %s",
blkID,
err)
return
}
// Respond to the validator with the fetched block and the same requestID.
t.Config.Sender.Put(vdr, requestID, blkID, blk.Bytes())
}
// Put implements the Engine interface
func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkBytes []byte) {
t.Config.Context.Log.Verbo("Put called for blockID %s", blkID)
// if the engine hasn't been bootstrapped, forward the request to the
// bootstrapper
if !t.bootstrapped {
t.bootstrapper.Put(vdr, requestID, blkID, blkBytes)
return
@ -95,32 +132,50 @@ func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkByt
t.Config.Context.Log.Warn("ParseBlock failed due to %s for block:\n%s",
err,
formatting.DumpBytes{Bytes: blkBytes})
t.GetFailed(vdr, requestID, blkID)
// because GetFailed doesn't utilize the assumption that we actually
// sent a Get message, we can safely call GetFailed here to potentially
// abandon the request.
t.GetFailed(vdr, requestID)
return
}
// insert the block into consensus. If the block has already been issued,
// this will be a noop. If this block has missing dependencies, vdr will
// receive requests to fill the ancestry. dependencies that have already
// been fetched, but with missing dependencies themselves won't be requested
// from the vdr.
t.insertFrom(vdr, blk)
}
// GetFailed implements the Engine interface
func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32, blkID ids.ID) {
func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32) {
// if the engine hasn't been bootstrapped, forward the request to the
// bootstrapper
if !t.bootstrapped {
t.bootstrapper.GetFailed(vdr, requestID, blkID)
t.bootstrapper.GetFailed(vdr, requestID)
return
}
t.pending.Remove(blkID)
t.blocked.Abandon(blkID)
t.blkReqs.Remove(blkID)
blkID, ok := t.blkReqs.Remove(vdr, requestID)
if !ok {
t.Config.Context.Log.Warn("GetFailed called without sending the corresponding Get message from %s",
vdr)
return
}
// Tracks performance statistics
t.numBlockedBlk.Set(float64(t.pending.Len()))
// because the get request was dropped, we no longer are expected blkID to
// be issued.
t.blocked.Abandon(blkID)
}
// PullQuery implements the Engine interface
func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID) {
// if the engine hasn't been bootstrapped, we aren't ready to respond to
// queries
if !t.bootstrapped {
t.Config.Context.Log.Debug("Dropping PullQuery for %s due to bootstrapping", blkID)
t.Config.Context.Log.Debug("Dropping PullQuery for %s due to bootstrapping",
blkID)
return
}
@ -131,6 +186,8 @@ func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID)
requestID: requestID,
}
// if we aren't able to have issued this block, then it is a dependency for
// this reply
if !t.reinsertFrom(vdr, blkID) {
c.deps.Add(blkID)
}
@ -140,17 +197,22 @@ func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID)
// PushQuery implements the Engine interface
func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID, blk []byte) {
// if the engine hasn't been bootstrapped, we aren't ready to respond to
// queries
if !t.bootstrapped {
t.Config.Context.Log.Debug("Dropping PushQuery for %s due to bootstrapping", blkID)
return
}
// push queries are treated the same as Put messages immediately followed by
// pull queries
t.Put(vdr, requestID, blkID, blk)
t.PullQuery(vdr, requestID, blkID)
}
// Chits implements the Engine interface
func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) {
// if the engine hasn't been bootstrapped, we shouldn't be receiving chits
if !t.bootstrapped {
t.Config.Context.Log.Warn("Dropping Chits due to bootstrapping")
return
@ -158,7 +220,13 @@ func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) {
// Since this is snowman, there should only be one ID in the vote set
if votes.Len() != 1 {
t.Config.Context.Log.Warn("Chits was called with the wrong number of votes %d. ValidatorID: %s, RequestID: %d", votes.Len(), vdr, requestID)
t.Config.Context.Log.Warn("Chits was called with the wrong number of votes %d. ValidatorID: %s, RequestID: %d",
votes.Len(),
vdr,
requestID)
// because QueryFailed doesn't utilize the assumption that we actually
// sent a Query message, we can safely call QueryFailed here to
// potentially abandon the request.
t.QueryFailed(vdr, requestID)
return
}
@ -173,6 +241,8 @@ func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) {
response: vote,
}
// if we aren't able to have issued the vote's block, then it is a
// dependency for applying the vote
if !t.reinsertFrom(vdr, vote) {
v.deps.Add(vote)
}
@ -196,6 +266,7 @@ func (t *Transitive) QueryFailed(vdr ids.ShortID, requestID uint32) {
// Notify implements the Engine interface
func (t *Transitive) Notify(msg common.Message) {
// if the engine hasn't been bootstrapped, we shouldn't issuing blocks
if !t.bootstrapped {
t.Config.Context.Log.Warn("Dropping Notify due to bootstrapping")
return
@ -292,13 +363,13 @@ func (t *Transitive) insert(blk snowman.Block) {
func (t *Transitive) sendRequest(vdr ids.ShortID, blkID ids.ID) {
if !t.blkReqs.Contains(blkID) {
t.blkReqs.Add(blkID)
t.numBlkRequests.Set(float64(t.blkReqs.Len())) // Tracks performance statistics
t.Config.Context.Log.Verbo("Sending Get message for %s", blkID)
t.RequestID++
t.Config.Context.Log.Verbo("Sending Get message for %s", blkID)
t.blkReqs.Add(vdr, t.RequestID, blkID)
t.Config.Sender.Get(vdr, t.RequestID, blkID)
t.numBlkRequests.Set(float64(t.blkReqs.Len())) // Tracks performance statistics
}
}