diff --git a/snow/consensus/snowman/snowman_block.go b/snow/consensus/snowman/snowman_block.go index b0c1ba4..dd69387 100644 --- a/snow/consensus/snowman/snowman_block.go +++ b/snow/consensus/snowman/snowman_block.go @@ -16,7 +16,7 @@ type snowmanBlock struct { // block that this node contains. For the genesis, this value will be nil blk Block - // shouldFalter is set to true if this node, and all its decendants received + // shouldFalter is set to true if this node, and all its descendants received // less than Alpha votes shouldFalter bool diff --git a/snow/consensus/snowman/topological.go b/snow/consensus/snowman/topological.go index 459673f..e852030 100644 --- a/snow/consensus/snowman/topological.go +++ b/snow/consensus/snowman/topological.go @@ -149,7 +149,7 @@ func (ts *Topological) Preference() ids.ID { return ts.tail } // During the sort, votes are pushed towards the genesis. To prevent interating // over all blocks that had unsuccessful polls, we set a flag on the block to // know that any future traversal through that block should register an -// unsuccessful poll on that block and every decendant block. +// unsuccessful poll on that block and every descendant block. // // The complexity of this function is: // - Runtime = 3 * |live set| + |votes| @@ -408,7 +408,7 @@ func (ts *Topological) getPreferredDecendent(blkID ids.ID) ids.ID { // accept the preferred child of the provided snowman block. By accepting the // preferred child, all other children will be rejected. When these children are -// rejected, all their decendants will be rejected. +// rejected, all their descendants will be rejected. func (ts *Topological) accept(n *snowmanBlock) { // We are finalizing the block's child, so we need to get the preference pref := n.sb.Preference() @@ -451,11 +451,11 @@ func (ts *Topological) accept(n *snowmanBlock) { rejects = append(rejects, childID) } - // reject all the decendants of the blocks we just rejected + // reject all the descendants of the blocks we just rejected ts.rejectTransitively(rejects) } -// Takes in a list of rejected ids and rejects all decendants of these IDs +// Takes in a list of rejected ids and rejects all descendants of these IDs func (ts *Topological) rejectTransitively(rejected []ids.ID) { // the rejected array is treated as a queue, with the next element at index // 0 and the last element at the end of the slice. diff --git a/snow/engine/common/engine.go b/snow/engine/common/engine.go index 6f3ac57..22fd828 100644 --- a/snow/engine/common/engine.go +++ b/snow/engine/common/engine.go @@ -37,7 +37,7 @@ type FrontierHandler interface { // Notify this engine of a request for the accepted frontier of vertices. // // The accepted frontier is the set of accepted vertices that do not have - // any accepted decendants. + // any accepted descendants. // // This function can be called by any validator. It is not safe to assume // this message is utilizing a unique requestID. However, the validatorID is @@ -149,9 +149,9 @@ type FetchHandler interface { // anticipated to be responded to. This could be because the recipient of // the message is unknown or if the message request has timed out. // - // The validatorID, requestID, and containerID are assumed to be the same as - // those sent in the Get message. - GetFailed(validatorID ids.ShortID, requestID uint32, containerID ids.ID) + // The validatorID and requestID are assumed to be the same as those sent in + // the Get message. + GetFailed(validatorID ids.ShortID, requestID uint32) } // QueryHandler defines how a consensus engine reacts to query messages from diff --git a/snow/engine/snowman/requests.go b/snow/engine/snowman/requests.go new file mode 100644 index 0000000..3945d23 --- /dev/null +++ b/snow/engine/snowman/requests.go @@ -0,0 +1,60 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package snowman + +import ( + "github.com/ava-labs/gecko/ids" +) + +// Requests ... +type Requests struct { + reqs map[[20]byte]map[uint32]ids.ID + requested ids.Set +} + +// Add ... +func (r *Requests) Add(vdr ids.ShortID, requestID uint32, containerID ids.ID) { + if r.reqs == nil { + r.reqs = make(map[[20]byte]map[uint32]ids.ID) + } + vdrKey := vdr.Key() + vdrReqs, ok := r.reqs[vdrKey] + if !ok { + vdrReqs = make(map[uint32]ids.ID) + r.reqs[vdrKey] = vdrReqs + } + vdrReqs[requestID] = containerID + r.requested.Add(containerID) +} + +// Remove ... +func (r *Requests) Remove(vdr ids.ShortID, requestID uint32) (ids.ID, bool) { + if r.reqs == nil { + return ids.ID{}, false + } + vdrKey := vdr.Key() + vdrReqs, ok := r.reqs[vdrKey] + if !ok { + return ids.ID{}, false + } + containerID, ok := vdrReqs[requestID] + if !ok { + return ids.ID{}, false + } + + if len(vdrReqs) == 1 { + delete(r.reqs, vdrKey) + } else { + delete(vdrReqs, requestID) + } + r.requested.Remove(containerID) + + return containerID, true +} + +// Len ... +func (r *Requests) Len() int { return r.requested.Len() } + +// Contains ... +func (r *Requests) Contains(containerID ids.ID) bool { return r.requested.Contains(containerID) } diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index 9a827a6..eb74de9 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -19,12 +19,22 @@ type Transitive struct { Config bootstrapper - polls polls // track people I have asked for their preference + // track outstanding preference requests + polls polls - blkReqs, pending ids.Set // prevent asking validators for the same block + // blocks that have outstaning get requests + blkReqs Requests - blocked events.Blocker // track operations that are blocked on blocks + // blocks that are fetched but haven't been issued due to missing + // dependencies + pending ids.Set + // operations that are blocked on a block being issued. This could be + // issuing another block, responding to a query, or applying votes to + // consensus + blocked events.Blocker + + // mark for if the engine has been bootstrapped or not bootstrapped bool } @@ -33,7 +43,11 @@ func (t *Transitive) Initialize(config Config) { config.Context.Log.Info("Initializing Snowman consensus") t.Config = config - t.metrics.Initialize(config.Context.Log, config.Params.Namespace, config.Params.Metrics) + t.metrics.Initialize( + config.Context.Log, + config.Params.Namespace, + config.Params.Metrics, + ) t.onFinished = t.finishBootstrapping t.bootstrapper.Initialize(config.BootstrapConfig) @@ -44,11 +58,19 @@ func (t *Transitive) Initialize(config Config) { t.polls.m = make(map[uint32]poll) } +// when bootstrapping is finished, this will be called. This initializes the +// consensus engine with the last accepted block. func (t *Transitive) finishBootstrapping() { + // set the bootstrapped mark to switch consensus modes t.bootstrapped = true + + // initialize consensus to the last accepted blockID tailID := t.Config.VM.LastAccepted() t.Consensus.Initialize(t.Config.Context, t.Params, tailID) + // to maintain the invariant that oracle blocks are issued in the correct + // preferences, we need to handle the case that we are bootstrapping into an + // oracle block tail, err := t.Config.VM.GetBlock(tailID) if err != nil { t.Config.Context.Log.Error("Failed to get last accepted block due to: %s", err) @@ -58,9 +80,12 @@ func (t *Transitive) finishBootstrapping() { switch blk := tail.(type) { case OracleBlock: for _, blk := range blk.Options() { + // note that deliver will set the VM's preference t.deliver(blk) } default: + // if there aren't blocks we need to deliver on startup, we need to set + // the preference to the last accepted block t.Config.VM.SetPreference(tailID) } } @@ -76,15 +101,27 @@ func (t *Transitive) Context() *snow.Context { return t.Config.Context } // Get implements the Engine interface func (t *Transitive) Get(vdr ids.ShortID, requestID uint32, blkID ids.ID) { - if blk, err := t.Config.VM.GetBlock(blkID); err == nil { - t.Config.Sender.Put(vdr, requestID, blkID, blk.Bytes()) + blk, err := t.Config.VM.GetBlock(blkID) + if err != nil { + // If we failed to get the block, that means either an unexpected error + // has occurred, the validator is not following the protocol, or the + // block has been pruned. + t.Config.Context.Log.Warn("Get called for blockID %s errored with %s", + blkID, + err) + return } + + // Respond to the validator with the fetched block and the same requestID. + t.Config.Sender.Put(vdr, requestID, blkID, blk.Bytes()) } // Put implements the Engine interface func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkBytes []byte) { t.Config.Context.Log.Verbo("Put called for blockID %s", blkID) + // if the engine hasn't been bootstrapped, forward the request to the + // bootstrapper if !t.bootstrapped { t.bootstrapper.Put(vdr, requestID, blkID, blkBytes) return @@ -95,32 +132,50 @@ func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkByt t.Config.Context.Log.Warn("ParseBlock failed due to %s for block:\n%s", err, formatting.DumpBytes{Bytes: blkBytes}) - t.GetFailed(vdr, requestID, blkID) + + // because GetFailed doesn't utilize the assumption that we actually + // sent a Get message, we can safely call GetFailed here to potentially + // abandon the request. + t.GetFailed(vdr, requestID) return } + // insert the block into consensus. If the block has already been issued, + // this will be a noop. If this block has missing dependencies, vdr will + // receive requests to fill the ancestry. dependencies that have already + // been fetched, but with missing dependencies themselves won't be requested + // from the vdr. t.insertFrom(vdr, blk) } // GetFailed implements the Engine interface -func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32, blkID ids.ID) { +func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32) { + // if the engine hasn't been bootstrapped, forward the request to the + // bootstrapper if !t.bootstrapped { - t.bootstrapper.GetFailed(vdr, requestID, blkID) + t.bootstrapper.GetFailed(vdr, requestID) return } - t.pending.Remove(blkID) - t.blocked.Abandon(blkID) - t.blkReqs.Remove(blkID) + blkID, ok := t.blkReqs.Remove(vdr, requestID) + if !ok { + t.Config.Context.Log.Warn("GetFailed called without sending the corresponding Get message from %s", + vdr) + return + } - // Tracks performance statistics - t.numBlockedBlk.Set(float64(t.pending.Len())) + // because the get request was dropped, we no longer are expected blkID to + // be issued. + t.blocked.Abandon(blkID) } // PullQuery implements the Engine interface func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID) { + // if the engine hasn't been bootstrapped, we aren't ready to respond to + // queries if !t.bootstrapped { - t.Config.Context.Log.Debug("Dropping PullQuery for %s due to bootstrapping", blkID) + t.Config.Context.Log.Debug("Dropping PullQuery for %s due to bootstrapping", + blkID) return } @@ -131,6 +186,8 @@ func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID) requestID: requestID, } + // if we aren't able to have issued this block, then it is a dependency for + // this reply if !t.reinsertFrom(vdr, blkID) { c.deps.Add(blkID) } @@ -140,17 +197,22 @@ func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID) // PushQuery implements the Engine interface func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID, blk []byte) { + // if the engine hasn't been bootstrapped, we aren't ready to respond to + // queries if !t.bootstrapped { t.Config.Context.Log.Debug("Dropping PushQuery for %s due to bootstrapping", blkID) return } + // push queries are treated the same as Put messages immediately followed by + // pull queries t.Put(vdr, requestID, blkID, blk) t.PullQuery(vdr, requestID, blkID) } // Chits implements the Engine interface func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) { + // if the engine hasn't been bootstrapped, we shouldn't be receiving chits if !t.bootstrapped { t.Config.Context.Log.Warn("Dropping Chits due to bootstrapping") return @@ -158,7 +220,13 @@ func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) { // Since this is snowman, there should only be one ID in the vote set if votes.Len() != 1 { - t.Config.Context.Log.Warn("Chits was called with the wrong number of votes %d. ValidatorID: %s, RequestID: %d", votes.Len(), vdr, requestID) + t.Config.Context.Log.Warn("Chits was called with the wrong number of votes %d. ValidatorID: %s, RequestID: %d", + votes.Len(), + vdr, + requestID) + // because QueryFailed doesn't utilize the assumption that we actually + // sent a Query message, we can safely call QueryFailed here to + // potentially abandon the request. t.QueryFailed(vdr, requestID) return } @@ -173,6 +241,8 @@ func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) { response: vote, } + // if we aren't able to have issued the vote's block, then it is a + // dependency for applying the vote if !t.reinsertFrom(vdr, vote) { v.deps.Add(vote) } @@ -196,6 +266,7 @@ func (t *Transitive) QueryFailed(vdr ids.ShortID, requestID uint32) { // Notify implements the Engine interface func (t *Transitive) Notify(msg common.Message) { + // if the engine hasn't been bootstrapped, we shouldn't issuing blocks if !t.bootstrapped { t.Config.Context.Log.Warn("Dropping Notify due to bootstrapping") return @@ -292,13 +363,13 @@ func (t *Transitive) insert(blk snowman.Block) { func (t *Transitive) sendRequest(vdr ids.ShortID, blkID ids.ID) { if !t.blkReqs.Contains(blkID) { - t.blkReqs.Add(blkID) - - t.numBlkRequests.Set(float64(t.blkReqs.Len())) // Tracks performance statistics + t.Config.Context.Log.Verbo("Sending Get message for %s", blkID) t.RequestID++ - t.Config.Context.Log.Verbo("Sending Get message for %s", blkID) + t.blkReqs.Add(vdr, t.RequestID, blkID) t.Config.Sender.Get(vdr, t.RequestID, blkID) + + t.numBlkRequests.Set(float64(t.blkReqs.Len())) // Tracks performance statistics } }