add comments. add GetAncestorsFailed to router/sender

This commit is contained in:
Dan Laine 2020-06-04 10:54:26 -04:00
parent 14480a24dc
commit 79a8c21457
9 changed files with 69 additions and 13 deletions

View File

@ -40,7 +40,7 @@ type bootstrapper struct {
// number of vertices processed so far
numProcessed uint32
// outstandingRequests tracks which validators were asked for which containers in which requests
// tracks which validators were asked for which containers in which requests
outstandingRequests common.Requests
// Contains IDs of vertices that have recently been processed

View File

@ -194,6 +194,16 @@ type FetchHandler interface {
// The validatorID and requestID are assumed to be the same as those sent in
// the Get message.
GetFailed(validatorID ids.ShortID, requestID uint32) error
// Notify this engine that a GetAncestors request it issued has failed.
//
// This function will be called if the engine sent a GetAncestors message that is not
// anticipated to be responded to. This could be because the recipient of
// the message is unknown or if the message request has timed out.
//
// The validatorID and requestID are assumed to be the same as those sent in
// the Get message.
GetAncestorsFailed(validatorID ids.ShortID, requestID uint32) error
}
// QueryHandler defines how a consensus engine reacts to query messages from

View File

@ -34,6 +34,7 @@ type EngineTest struct {
CantGet,
CantGetAncestors,
CantGetFailed,
CantGetAncestorsFailed,
CantPut,
CantMultiPut,
@ -42,14 +43,15 @@ type EngineTest struct {
CantQueryFailed,
CantChits bool
ContextF func() *snow.Context
StartupF, GossipF, ShutdownF func() error
NotifyF func(Message) error
GetF, GetAncestorsF, PullQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID) error
PutF, PushQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte) error
MultiPutF func(validatorID ids.ShortID, requestID uint32, containers [][]byte) error
AcceptedFrontierF, GetAcceptedF, AcceptedF, ChitsF func(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) error
GetAcceptedFrontierF, GetFailedF, QueryFailedF, GetAcceptedFrontierFailedF, GetAcceptedFailedF func(validatorID ids.ShortID, requestID uint32) error
ContextF func() *snow.Context
StartupF, GossipF, ShutdownF func() error
NotifyF func(Message) error
GetF, GetAncestorsF, PullQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID) error
PutF, PushQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte) error
MultiPutF func(validatorID ids.ShortID, requestID uint32, containers [][]byte) error
AcceptedFrontierF, GetAcceptedF, AcceptedF, ChitsF func(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) error
GetAcceptedFrontierF, GetFailedF, GetAncestorsFailedF,
QueryFailedF, GetAcceptedFrontierFailedF, GetAcceptedFailedF func(validatorID ids.ShortID, requestID uint32) error
}
var _ Engine = &EngineTest{}
@ -261,6 +263,19 @@ func (e *EngineTest) GetFailed(validatorID ids.ShortID, requestID uint32) error
return nil
}
// GetAncestorsFailed ...
func (e *EngineTest) GetAncestorsFailed(validatorID ids.ShortID, requestID uint32) error {
if e.GetAncestorsFailedF != nil {
return e.GetAncestorsFailedF(validatorID, requestID)
} else if e.CantGetAncestorsFailed {
if e.T != nil {
e.T.Fatalf("Unexpectedly called GetAncestorsFailed")
}
return errors.New("Unexpectedly called GetAncestorsFailed")
}
return nil
}
// Put ...
func (e *EngineTest) Put(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte) error {
if e.PutF != nil {

View File

@ -32,12 +32,16 @@ type bootstrapper struct {
metrics
common.Bootstrapper
// Number of blocks processed
numProcessed uint32
// outstandingRequests tracks which validators were asked for which containers in which requests
// tracks which validators were asked for which containers in which requests
outstandingRequests common.Requests
finished bool
// true if bootstrapping is done
finished bool
// Called when bootstrapping is done
onFinished func() error
}

View File

@ -102,6 +102,8 @@ func (h *Handler) dispatchMsg(msg message) bool {
err = h.engine.GetAncestors(msg.validatorID, msg.requestID, msg.containerID)
case getFailedMsg:
err = h.engine.GetFailed(msg.validatorID, msg.requestID)
case getAncestorsFailedMsg:
err = h.engine.GetAncestorsFailed(msg.validatorID, msg.requestID)
case putMsg:
err = h.engine.Put(msg.validatorID, msg.requestID, msg.containerID, msg.container)
case multiPutMsg:
@ -242,6 +244,15 @@ func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) {
}
}
// GetAncestorsFailed passes a GetAncestorsFailed message to the consensus engine.
func (h *Handler) GetAncestorsFailed(validatorID ids.ShortID, requestID uint32) {
h.msgs <- message{
messageType: getAncestorsFailedMsg,
validatorID: validatorID,
requestID: requestID,
}
}
// PushQuery passes a PushQuery message received from the network to the consensus engine.
func (h *Handler) PushQuery(validatorID ids.ShortID, requestID uint32, blockID ids.ID, block []byte) {
h.msgs <- message{

View File

@ -33,6 +33,7 @@ const (
shutdownMsg
getAncestorsMsg
multiPutMsg
getAncestorsFailedMsg
)
type message struct {

View File

@ -49,5 +49,6 @@ type InternalRouter interface {
GetAcceptedFrontierFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32)
GetAcceptedFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32)
GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32)
GetAncestorsFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32)
QueryFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32)
}

View File

@ -222,7 +222,7 @@ func (sr *ChainRouter) MultiPut(validatorID ids.ShortID, chainID ids.ID, request
sr.lock.RLock()
defer sr.lock.RUnlock()
// This message came in response to a Get message from this node, and when we sent that Get
// This message came in response to a GetAncestors message from this node, and when we sent that
// message we set a timeout. Since we got a response, cancel the timeout.
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
@ -246,6 +246,20 @@ func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, reques
}
}
// GetAncestorsFailed routes an incoming GetAncestorsFailed message from the validator with ID [validatorID]
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) GetAncestorsFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
chain.GetAncestorsFailed(validatorID, requestID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
}
}
// PushQuery routes an incoming PushQuery request from the validator with ID [validatorID]
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) PushQuery(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {

View File

@ -98,7 +98,7 @@ func (s *Sender) GetAncestors(validatorID ids.ShortID, requestID uint32, contain
s.ctx.Log.Verbo("Sending GetAncestors to validator %s. RequestID: %d. ContainerID: %s", validatorID, requestID, containerID)
// Sending a GetAncestors to myself will always fail
if validatorID.Equals(s.ctx.NodeID) {
go s.router.GetFailed(validatorID, s.ctx.ChainID, requestID)
go s.router.GetAncestorsFailed(validatorID, s.ctx.ChainID, requestID)
return
}
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {