Added more snowman comments, fixed minor bugs

This commit is contained in:
StephenButtolph 2020-05-02 02:46:35 -04:00
parent 13a8167ceb
commit 13ea33c42c
3 changed files with 352 additions and 61 deletions

View File

@ -157,6 +157,9 @@ func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32) {
return
}
// we don't use the assumption that this function is called after a failed
// Get message. So we first check to see if we have an outsanding request
// and also get what the request was for if it exists
blkID, ok := t.blkReqs.Remove(vdr, requestID)
if !ok {
t.Config.Context.Log.Warn("GetFailed called without sending the corresponding Get message from %s",
@ -196,7 +199,7 @@ func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID)
}
// PushQuery implements the Engine interface
func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID, blk []byte) {
func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkBytes []byte) {
// if the engine hasn't been bootstrapped, we aren't ready to respond to
// queries
if !t.bootstrapped {
@ -204,10 +207,24 @@ func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID,
return
}
// push queries are treated the same as Put messages immediately followed by
// pull queries
t.Put(vdr, requestID, blkID, blk)
t.PullQuery(vdr, requestID, blkID)
blk, err := t.Config.VM.ParseBlock(blkBytes)
// If the parsing fails, we just drop the request, as we didn't ask for it
if err != nil {
t.Config.Context.Log.Warn("ParseBlock failed due to %s for block:\n%s",
err,
formatting.DumpBytes{Bytes: blkBytes})
return
}
// insert the block into consensus. If the block has already been issued,
// this will be a noop. If this block has missing dependencies, vdr will
// receive requests to fill the ancestry. dependencies that have already
// been fetched, but with missing dependencies themselves won't be requested
// from the vdr.
t.insertFrom(vdr, blk)
// register the chit request
t.PullQuery(vdr, requestID, blk.ID())
}
// Chits implements the Engine interface
@ -224,6 +241,7 @@ func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) {
votes.Len(),
vdr,
requestID)
// because QueryFailed doesn't utilize the assumption that we actually
// sent a Query message, we can safely call QueryFailed here to
// potentially abandon the request.
@ -252,6 +270,7 @@ func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) {
// QueryFailed implements the Engine interface
func (t *Transitive) QueryFailed(vdr ids.ShortID, requestID uint32) {
// if the engine hasn't been bootstrapped, we won't have sent a query
if !t.bootstrapped {
t.Config.Context.Log.Warn("Dropping QueryFailed due to bootstrapping")
return
@ -275,21 +294,32 @@ func (t *Transitive) Notify(msg common.Message) {
t.Config.Context.Log.Verbo("Snowman engine notified of %s from the vm", msg)
switch msg {
case common.PendingTxs:
if blk, err := t.Config.VM.BuildBlock(); err == nil {
if status := blk.Status(); status != choices.Processing {
t.Config.Context.Log.Warn("Attempting to issue a block with status: %s, expected Processing", status)
}
parentID := blk.Parent().ID()
if pref := t.Consensus.Preference(); !parentID.Equals(pref) {
t.Config.Context.Log.Warn("Built block with parent: %s, expected %s", parentID, pref)
}
if t.insertAll(blk) {
t.Config.Context.Log.Verbo("Successfully issued new block from the VM")
} else {
t.Config.Context.Log.Warn("VM.BuildBlock returned a block that is pending for ancestors")
}
} else {
// the pending txs message means we should attempt to build a block.
blk, err := t.Config.VM.BuildBlock()
if err != nil {
t.Config.Context.Log.Verbo("VM.BuildBlock errored with %s", err)
return
}
// a newly created block is expected to be processing. If this check
// fails, there is potentially an error in the VM this engine is running
if status := blk.Status(); status != choices.Processing {
t.Config.Context.Log.Warn("Attempting to issue a block with status: %s, expected Processing", status)
}
// the newly created block should be built on top of the preferred
// block. Otherwise, the new block doesn't have the best chance of being
// confirmed.
parentID := blk.Parent().ID()
if pref := t.Consensus.Preference(); !parentID.Equals(pref) {
t.Config.Context.Log.Warn("Built block with parent: %s, expected %s", parentID, pref)
}
// inserting the block shouldn't have any missing dependencies
if t.insertAll(blk) {
t.Config.Context.Log.Verbo("Successfully issued new block from the VM")
} else {
t.Config.Context.Log.Warn("VM.BuildBlock returned a block that is pending for ancestors")
}
default:
t.Config.Context.Log.Warn("Unexpected message from the VM: %s", msg)
@ -297,10 +327,20 @@ func (t *Transitive) Notify(msg common.Message) {
}
func (t *Transitive) repoll() {
// if we are issuing a repoll, we should gossip our current preferences to
// propagate the most likely branch as quickly as possible
prefID := t.Consensus.Preference()
t.pullSample(prefID)
for i := len(t.polls.m); i < t.Params.ConcurrentRepolls; i++ {
t.pullSample(prefID)
}
}
// reinsertFrom attempts to issue the branch ending with a block, from only its
// ID, to consensus. Returns true if the block was added, or was previously
// added, to consensus. This is useful to check the local DB before requesting a
// block in case we have the block for some reason. If the block or a dependency
// is missing, the validator will be sent a Get message.
func (t *Transitive) reinsertFrom(vdr ids.ShortID, blkID ids.ID) bool {
blk, err := t.Config.VM.GetBlock(blkID)
if err != nil {
@ -310,37 +350,73 @@ func (t *Transitive) reinsertFrom(vdr ids.ShortID, blkID ids.ID) bool {
return t.insertFrom(vdr, blk)
}
// insertFrom attempts to issue the branch ending with a block to consensus.
// Returns true if the block was added, or was previously added, to consensus.
// This is useful to check the local DB before requesting a block in case we
// have the block for some reason. If a dependency is missing, the validator
// will be sent a Get message.
func (t *Transitive) insertFrom(vdr ids.ShortID, blk snowman.Block) bool {
blkID := blk.ID()
// if the block has been issued, we don't need to insert it. if the block is
// already pending, we shouldn't attempt to insert it again yet
for !t.Consensus.Issued(blk) && !t.pending.Contains(blkID) {
t.insert(blk)
parent := blk.Parent()
parentID := parent.ID()
if parentStatus := parent.Status(); !parentStatus.Fetched() {
t.sendRequest(vdr, parentID)
blk = blk.Parent()
blkID = blk.ID()
// if the parent hasn't been fetched, we need to request it to issue the
// newly inserted block
if !blk.Status().Fetched() {
t.sendRequest(vdr, blkID)
return false
}
blk = parent
blkID = parentID
}
return !t.pending.Contains(blkID)
return t.Consensus.Issued(blk)
}
// insertAll attempts to issue the branch ending with a block to consensus.
// Returns true if the block was added, or was previously added, to consensus.
// This is useful to check the local DB before requesting a block in case we
// have the block for some reason. If a dependency is missing and the dependency
// hasn't been requested, the issuance will be abandoned.
func (t *Transitive) insertAll(blk snowman.Block) bool {
blkID := blk.ID()
for blk.Status().Fetched() && !t.Consensus.Issued(blk) && !t.pending.Contains(blkID) {
t.insert(blk)
blk = blk.Parent()
blkID = blk.ID()
}
return !t.pending.Contains(blkID)
// if issuance the block was successful, this is the happy path
if t.Consensus.Issued(blk) {
return true
}
// if this branch is waiting on a block that we supposedly have a source of,
// we can just wait for that request to succeed or fail
if t.blkReqs.Contains(blkID) {
return false
}
// if we have no reason to expect that this block will be inserted, we
// should abandon the block to avoid a memory leak
t.blocked.Abandon(blkID)
return false
}
// attempt to insert the block to consensus. If the block's parent hasn't been
// issued, the insertion will block until the parent's issuance is abandoned or
// fulfilled
func (t *Transitive) insert(blk snowman.Block) {
blkID := blk.ID()
// mark that the block has been fetched but is pending
t.pending.Add(blkID)
// if we have any outstanding requests for this block, remove the pending
// requests
t.blkReqs.RemoveAny(blkID)
i := &issuer{
@ -348,6 +424,7 @@ func (t *Transitive) insert(blk snowman.Block) {
blk: blk,
}
// block on the parent if needed
if parent := blk.Parent(); !t.Consensus.Issued(parent) {
parentID := parent.ID()
t.Config.Context.Log.Verbo("Block waiting for parent %s", parentID)
@ -362,17 +439,22 @@ func (t *Transitive) insert(blk snowman.Block) {
}
func (t *Transitive) sendRequest(vdr ids.ShortID, blkID ids.ID) {
if !t.blkReqs.Contains(blkID) {
t.Config.Context.Log.Verbo("Sending Get message for %s", blkID)
t.RequestID++
t.blkReqs.Add(vdr, t.RequestID, blkID)
t.Config.Sender.Get(vdr, t.RequestID, blkID)
t.numBlkRequests.Set(float64(t.blkReqs.Len())) // Tracks performance statistics
// only send one request at a time for a block
if t.blkReqs.Contains(blkID) {
return
}
t.Config.Context.Log.Verbo("Sending Get message for %s", blkID)
t.RequestID++
t.blkReqs.Add(vdr, t.RequestID, blkID)
t.Config.Sender.Get(vdr, t.RequestID, blkID)
// Tracks performance statistics
t.numBlkRequests.Set(float64(t.blkReqs.Len()))
}
// send a pull request for this block ID
func (t *Transitive) pullSample(blkID ids.ID) {
t.Config.Context.Log.Verbo("About to sample from: %s", t.Config.Validators)
p := t.Consensus.Parameters()
@ -382,15 +464,22 @@ func (t *Transitive) pullSample(blkID ids.ID) {
vdrSet.Add(vdr.ID())
}
t.RequestID++
if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet.Len()) {
t.Config.Sender.PullQuery(vdrSet, t.RequestID, blkID)
} else if numVdrs < p.K {
if numVdrs := len(vdrs); numVdrs != p.K {
t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", blkID)
return
}
t.RequestID++
if !t.polls.Add(t.RequestID, vdrSet.Len()) {
t.Config.Context.Log.Error("Query for %s was dropped due to use of a duplicated requestID", blkID)
return
}
t.Config.Sender.PullQuery(vdrSet, t.RequestID, blkID)
}
func (t *Transitive) pushSample(blk snowman.Block) bool {
// send a push request for this block
func (t *Transitive) pushSample(blk snowman.Block) {
t.Config.Context.Log.Verbo("About to sample from: %s", t.Config.Validators)
p := t.Consensus.Parameters()
vdrs := t.Config.Validators.Sample(p.K)
@ -399,15 +488,20 @@ func (t *Transitive) pushSample(blk snowman.Block) bool {
vdrSet.Add(vdr.ID())
}
t.RequestID++
queryIssued := false
if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet.Len()) {
t.Config.Sender.PushQuery(vdrSet, t.RequestID, blk.ID(), blk.Bytes())
queryIssued = true
} else if numVdrs < p.K {
t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", blk.ID())
blkID := blk.ID()
if numVdrs := len(vdrs); numVdrs != p.K {
t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", blkID)
return
}
return queryIssued
t.RequestID++
if !t.polls.Add(t.RequestID, vdrSet.Len()) {
t.Config.Context.Log.Error("Query for %s was dropped due to use of a duplicated requestID", blkID)
return
}
t.Config.Sender.PushQuery(vdrSet, t.RequestID, blkID, blk.Bytes())
return
}
func (t *Transitive) deliver(blk snowman.Block) {
@ -415,11 +509,14 @@ func (t *Transitive) deliver(blk snowman.Block) {
return
}
// we are adding the block to consensus, so it is no longer pending
blkID := blk.ID()
t.pending.Remove(blkID)
if err := blk.Verify(); err != nil {
t.Config.Context.Log.Debug("Block failed verification due to %s, dropping block", err)
// if verify fails, then all decedents are also invalid
t.blocked.Abandon(blkID)
t.numBlockedBlk.Set(float64(t.pending.Len())) // Tracks performance statistics
return
@ -427,8 +524,10 @@ func (t *Transitive) deliver(blk snowman.Block) {
t.Config.Context.Log.Verbo("Adding block to consensus: %s", blkID)
t.Consensus.Add(blk)
polled := t.pushSample(blk)
// Add all the oracle blocks if they exist. We call verify on all the blocks
// and add them to consensus before marking anything as fulfilled to avoid
// any potential reentrant bugs.
added := []snowman.Block{}
dropped := []snowman.Block{}
switch blk := blk.(type) {
@ -436,20 +535,23 @@ func (t *Transitive) deliver(blk snowman.Block) {
for _, blk := range blk.Options() {
if err := blk.Verify(); err != nil {
t.Config.Context.Log.Debug("Block failed verification due to %s, dropping block", err)
t.blocked.Abandon(blk.ID())
dropped = append(dropped, blk)
} else {
t.Consensus.Add(blk)
t.pushSample(blk)
added = append(added, blk)
}
}
}
t.Config.VM.SetPreference(t.Consensus.Preference())
t.blocked.Fulfill(blkID)
// launch a query for the newly added block
t.pushSample(blk)
t.blocked.Fulfill(blkID)
for _, blk := range added {
t.pushSample(blk)
blkID := blk.ID()
t.pending.Remove(blkID)
t.blocked.Fulfill(blkID)
@ -460,9 +562,8 @@ func (t *Transitive) deliver(blk snowman.Block) {
t.blocked.Abandon(blkID)
}
if polled && len(t.polls.m) < t.Params.ConcurrentRepolls {
t.repoll()
}
// If we should issue multiple queries at the same time, we need to repoll
t.repoll()
// Tracks performance statistics
t.numBlkRequests.Set(float64(t.blkReqs.Len()))

View File

@ -1294,3 +1294,196 @@ func TestEngineInvalidBlockIgnoredFromUnexpectedPeer(t *testing.T) {
t.Fatalf("Shouldn't have abandoned the pending block")
}
}
func TestEnginePushQueryRequestIDConflict(t *testing.T) {
vdr, _, sender, vm, te, gBlk := setup(t)
sender.Default(true)
missingBlk := &Blk{
parent: gBlk,
id: GenerateID(),
height: 1,
status: choices.Unknown,
bytes: []byte{1},
}
pendingBlk := &Blk{
parent: missingBlk,
id: GenerateID(),
height: 2,
status: choices.Processing,
bytes: []byte{2},
}
randomBlkID := GenerateID()
parsed := new(bool)
vm.ParseBlockF = func(b []byte) (snowman.Block, error) {
switch {
case bytes.Equal(b, pendingBlk.Bytes()):
*parsed = true
return pendingBlk, nil
}
return nil, errUnknownBlock
}
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
if !*parsed {
return nil, errUnknownBlock
}
switch {
case blkID.Equals(pendingBlk.ID()):
return pendingBlk, nil
}
return nil, errUnknownBlock
}
reqID := new(uint32)
sender.GetF = func(reqVdr ids.ShortID, requestID uint32, blkID ids.ID) {
*reqID = requestID
if !reqVdr.Equals(vdr.ID()) {
t.Fatalf("Wrong validator requested")
}
if !blkID.Equals(missingBlk.ID()) {
t.Fatalf("Wrong block requested")
}
}
te.PushQuery(vdr.ID(), 0, pendingBlk.ID(), pendingBlk.Bytes())
sender.GetF = nil
sender.CantGet = false
te.PushQuery(vdr.ID(), *reqID, randomBlkID, []byte{3})
*parsed = false
vm.ParseBlockF = func(b []byte) (snowman.Block, error) {
switch {
case bytes.Equal(b, missingBlk.Bytes()):
*parsed = true
return missingBlk, nil
}
return nil, errUnknownBlock
}
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
if !*parsed {
return nil, errUnknownBlock
}
switch {
case blkID.Equals(missingBlk.ID()):
return missingBlk, nil
}
return nil, errUnknownBlock
}
sender.CantPushQuery = false
sender.CantChits = false
te.Put(vdr.ID(), *reqID, missingBlk.ID(), missingBlk.Bytes())
pref := te.Consensus.Preference()
if !pref.Equals(pendingBlk.ID()) {
t.Fatalf("Shouldn't have abandoned the pending block")
}
}
func TestEngineAggressivePolling(t *testing.T) {
config := DefaultConfig()
config.Params.ConcurrentRepolls = 2
vdr := validators.GenerateRandomValidator(1)
vals := validators.NewSet()
config.Validators = vals
vals.Add(vdr)
sender := &common.SenderTest{}
sender.T = t
config.Sender = sender
sender.Default(true)
vm := &VMTest{}
vm.T = t
config.VM = vm
vm.Default(true)
vm.CantSetPreference = false
gBlk := &Blk{
id: GenerateID(),
status: choices.Accepted,
}
vm.LastAcceptedF = func() ids.ID { return gBlk.ID() }
sender.CantGetAcceptedFrontier = false
te := &Transitive{}
te.Initialize(config)
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
if !blkID.Equals(gBlk.ID()) {
t.Fatalf("Wrong block requested")
}
return gBlk, nil
}
te.finishBootstrapping()
vm.GetBlockF = nil
vm.LastAcceptedF = nil
sender.CantGetAcceptedFrontier = true
sender.Default(true)
pendingBlk := &Blk{
parent: gBlk,
id: GenerateID(),
height: 2,
status: choices.Processing,
bytes: []byte{1},
}
parsed := new(bool)
vm.ParseBlockF = func(b []byte) (snowman.Block, error) {
switch {
case bytes.Equal(b, pendingBlk.Bytes()):
*parsed = true
return pendingBlk, nil
}
return nil, errUnknownBlock
}
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
if !*parsed {
return nil, errUnknownBlock
}
switch {
case blkID.Equals(pendingBlk.ID()):
return pendingBlk, nil
}
return nil, errUnknownBlock
}
numPushed := new(int)
sender.PushQueryF = func(_ ids.ShortSet, _ uint32, _ ids.ID, _ []byte) { *numPushed++ }
numPulled := new(int)
sender.PullQueryF = func(_ ids.ShortSet, _ uint32, _ ids.ID) { *numPulled++ }
te.Put(vdr.ID(), 0, pendingBlk.ID(), pendingBlk.Bytes())
if *numPushed != 1 {
t.Fatalf("Should have initially sent a push query")
}
if *numPulled != 1 {
t.Fatalf("Should have sent an additional pull query")
}
}

View File

@ -56,10 +56,7 @@ func (v *voter) Update() {
}
v.t.Config.Context.Log.Verbo("Snowman engine can't quiesce")
if len(v.t.polls.m) < v.t.Config.Params.ConcurrentRepolls {
v.t.repoll()
}
v.t.repoll()
}
func (v *voter) bubbleVotes(votes ids.Bag) ids.Bag {