mirror of https://github.com/poanetwork/gecko.git
Removed potential deadlock in registering a new chain
This commit is contained in:
parent
bc31669e4f
commit
367b5c30db
|
@ -246,6 +246,7 @@ func (h *Handler) Get(validatorID ids.ShortID, requestID uint32, containerID ids
|
||||||
|
|
||||||
// GetAncestors passes a GetAncestors message received from the network to the consensus engine.
|
// GetAncestors passes a GetAncestors message received from the network to the consensus engine.
|
||||||
func (h *Handler) GetAncestors(validatorID ids.ShortID, requestID uint32, containerID ids.ID) {
|
func (h *Handler) GetAncestors(validatorID ids.ShortID, requestID uint32, containerID ids.ID) {
|
||||||
|
h.metrics.pending.Inc()
|
||||||
h.msgs <- message{
|
h.msgs <- message{
|
||||||
messageType: getAncestorsMsg,
|
messageType: getAncestorsMsg,
|
||||||
validatorID: validatorID,
|
validatorID: validatorID,
|
||||||
|
@ -268,6 +269,7 @@ func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids
|
||||||
|
|
||||||
// MultiPut passes a MultiPut message received from the network to the consensus engine.
|
// MultiPut passes a MultiPut message received from the network to the consensus engine.
|
||||||
func (h *Handler) MultiPut(validatorID ids.ShortID, requestID uint32, containers [][]byte) {
|
func (h *Handler) MultiPut(validatorID ids.ShortID, requestID uint32, containers [][]byte) {
|
||||||
|
h.metrics.pending.Inc()
|
||||||
h.msgs <- message{
|
h.msgs <- message{
|
||||||
messageType: multiPutMsg,
|
messageType: multiPutMsg,
|
||||||
validatorID: validatorID,
|
validatorID: validatorID,
|
||||||
|
@ -288,6 +290,7 @@ func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) {
|
||||||
|
|
||||||
// GetAncestorsFailed passes a GetAncestorsFailed message to the consensus engine.
|
// GetAncestorsFailed passes a GetAncestorsFailed message to the consensus engine.
|
||||||
func (h *Handler) GetAncestorsFailed(validatorID ids.ShortID, requestID uint32) {
|
func (h *Handler) GetAncestorsFailed(validatorID ids.ShortID, requestID uint32) {
|
||||||
|
h.metrics.pending.Inc()
|
||||||
h.msgs <- message{
|
h.msgs <- message{
|
||||||
messageType: getAncestorsFailedMsg,
|
messageType: getAncestorsFailedMsg,
|
||||||
validatorID: validatorID,
|
validatorID: validatorID,
|
||||||
|
|
|
@ -64,10 +64,15 @@ func (sr *ChainRouter) AddChain(chain *Handler) {
|
||||||
// RemoveChain removes the specified chain so that incoming
|
// RemoveChain removes the specified chain so that incoming
|
||||||
// messages can't be routed to it
|
// messages can't be routed to it
|
||||||
func (sr *ChainRouter) RemoveChain(chainID ids.ID) {
|
func (sr *ChainRouter) RemoveChain(chainID ids.ID) {
|
||||||
sr.lock.Lock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.Unlock()
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
|
||||||
chain.Shutdown()
|
chain.Shutdown()
|
||||||
close(chain.msgs)
|
close(chain.msgs)
|
||||||
|
|
||||||
|
@ -79,10 +84,9 @@ func (sr *ChainRouter) RemoveChain(chainID ids.ID) {
|
||||||
}
|
}
|
||||||
ticker.Stop()
|
ticker.Stop()
|
||||||
|
|
||||||
|
sr.lock.Lock()
|
||||||
delete(sr.chains, chainID.Key())
|
delete(sr.chains, chainID.Key())
|
||||||
} else {
|
sr.lock.Unlock()
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetAcceptedFrontier routes an incoming GetAcceptedFrontier request from the
|
// GetAcceptedFrontier routes an incoming GetAcceptedFrontier request from the
|
||||||
|
@ -90,9 +94,10 @@ func (sr *ChainRouter) RemoveChain(chainID ids.ID) {
|
||||||
// chain with ID [chainID]
|
// chain with ID [chainID]
|
||||||
func (sr *ChainRouter) GetAcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
|
func (sr *ChainRouter) GetAcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
|
||||||
sr.lock.RLock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.RUnlock()
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
if exists {
|
||||||
chain.GetAcceptedFrontier(validatorID, requestID)
|
chain.GetAcceptedFrontier(validatorID, requestID)
|
||||||
} else {
|
} else {
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
@ -104,10 +109,11 @@ func (sr *ChainRouter) GetAcceptedFrontier(validatorID ids.ShortID, chainID ids.
|
||||||
// chain with ID [chainID]
|
// chain with ID [chainID]
|
||||||
func (sr *ChainRouter) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
|
func (sr *ChainRouter) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
|
||||||
sr.lock.RLock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.RUnlock()
|
|
||||||
|
|
||||||
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
|
if exists {
|
||||||
chain.AcceptedFrontier(validatorID, requestID, containerIDs)
|
chain.AcceptedFrontier(validatorID, requestID, containerIDs)
|
||||||
} else {
|
} else {
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
@ -119,10 +125,11 @@ func (sr *ChainRouter) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID,
|
||||||
// working on the chain with ID [chainID]
|
// working on the chain with ID [chainID]
|
||||||
func (sr *ChainRouter) GetAcceptedFrontierFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
|
func (sr *ChainRouter) GetAcceptedFrontierFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
|
||||||
sr.lock.RLock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.RUnlock()
|
|
||||||
|
|
||||||
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
|
if exists {
|
||||||
chain.GetAcceptedFrontierFailed(validatorID, requestID)
|
chain.GetAcceptedFrontierFailed(validatorID, requestID)
|
||||||
} else {
|
} else {
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
@ -134,9 +141,10 @@ func (sr *ChainRouter) GetAcceptedFrontierFailed(validatorID ids.ShortID, chainI
|
||||||
// chain with ID [chainID]
|
// chain with ID [chainID]
|
||||||
func (sr *ChainRouter) GetAccepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
|
func (sr *ChainRouter) GetAccepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
|
||||||
sr.lock.RLock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.RUnlock()
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
if exists {
|
||||||
chain.GetAccepted(validatorID, requestID, containerIDs)
|
chain.GetAccepted(validatorID, requestID, containerIDs)
|
||||||
} else {
|
} else {
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
@ -148,10 +156,11 @@ func (sr *ChainRouter) GetAccepted(validatorID ids.ShortID, chainID ids.ID, requ
|
||||||
// [chainID]
|
// [chainID]
|
||||||
func (sr *ChainRouter) Accepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
|
func (sr *ChainRouter) Accepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
|
||||||
sr.lock.RLock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.RUnlock()
|
|
||||||
|
|
||||||
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
|
if exists {
|
||||||
chain.Accepted(validatorID, requestID, containerIDs)
|
chain.Accepted(validatorID, requestID, containerIDs)
|
||||||
} else {
|
} else {
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
@ -163,10 +172,11 @@ func (sr *ChainRouter) Accepted(validatorID ids.ShortID, chainID ids.ID, request
|
||||||
// chain with ID [chainID]
|
// chain with ID [chainID]
|
||||||
func (sr *ChainRouter) GetAcceptedFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
|
func (sr *ChainRouter) GetAcceptedFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
|
||||||
sr.lock.RLock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.RUnlock()
|
|
||||||
|
|
||||||
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
|
if exists {
|
||||||
chain.GetAcceptedFailed(validatorID, requestID)
|
chain.GetAcceptedFailed(validatorID, requestID)
|
||||||
} else {
|
} else {
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
@ -177,9 +187,10 @@ func (sr *ChainRouter) GetAcceptedFailed(validatorID ids.ShortID, chainID ids.ID
|
||||||
// to the consensus engine working on the chain with ID [chainID]
|
// to the consensus engine working on the chain with ID [chainID]
|
||||||
func (sr *ChainRouter) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
|
func (sr *ChainRouter) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
|
||||||
sr.lock.RLock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.RUnlock()
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
if exists {
|
||||||
chain.Get(validatorID, requestID, containerID)
|
chain.Get(validatorID, requestID, containerID)
|
||||||
} else {
|
} else {
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
@ -191,9 +202,10 @@ func (sr *ChainRouter) Get(validatorID ids.ShortID, chainID ids.ID, requestID ui
|
||||||
// The maximum number of ancestors to respond with is define in snow/engine/commong/bootstrapper.go
|
// The maximum number of ancestors to respond with is define in snow/engine/commong/bootstrapper.go
|
||||||
func (sr *ChainRouter) GetAncestors(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
|
func (sr *ChainRouter) GetAncestors(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
|
||||||
sr.lock.RLock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.RUnlock()
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
if exists {
|
||||||
chain.GetAncestors(validatorID, requestID, containerID)
|
chain.GetAncestors(validatorID, requestID, containerID)
|
||||||
} else {
|
} else {
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
@ -204,12 +216,13 @@ func (sr *ChainRouter) GetAncestors(validatorID ids.ShortID, chainID ids.ID, req
|
||||||
// to the consensus engine working on the chain with ID [chainID]
|
// to the consensus engine working on the chain with ID [chainID]
|
||||||
func (sr *ChainRouter) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
|
func (sr *ChainRouter) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
|
||||||
sr.lock.RLock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.RUnlock()
|
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
||||||
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
// This message came in response to a Get message from this node, and when we sent that Get
|
// This message came in response to a Get message from this node, and when we sent that Get
|
||||||
// message we set a timeout. Since we got a response, cancel the timeout.
|
// message we set a timeout. Since we got a response, cancel the timeout.
|
||||||
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
if exists {
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
|
||||||
chain.Put(validatorID, requestID, containerID, container)
|
chain.Put(validatorID, requestID, containerID, container)
|
||||||
} else {
|
} else {
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
@ -220,12 +233,13 @@ func (sr *ChainRouter) Put(validatorID ids.ShortID, chainID ids.ID, requestID ui
|
||||||
// to the consensus engine working on the chain with ID [chainID]
|
// to the consensus engine working on the chain with ID [chainID]
|
||||||
func (sr *ChainRouter) MultiPut(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containers [][]byte) {
|
func (sr *ChainRouter) MultiPut(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containers [][]byte) {
|
||||||
sr.lock.RLock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.RUnlock()
|
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
||||||
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
// This message came in response to a GetAncestors message from this node, and when we sent that
|
// This message came in response to a GetAncestors message from this node, and when we sent that
|
||||||
// message we set a timeout. Since we got a response, cancel the timeout.
|
// message we set a timeout. Since we got a response, cancel the timeout.
|
||||||
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
if exists {
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
|
||||||
chain.MultiPut(validatorID, requestID, containers)
|
chain.MultiPut(validatorID, requestID, containers)
|
||||||
} else {
|
} else {
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
@ -236,10 +250,11 @@ func (sr *ChainRouter) MultiPut(validatorID ids.ShortID, chainID ids.ID, request
|
||||||
// to the consensus engine working on the chain with ID [chainID]
|
// to the consensus engine working on the chain with ID [chainID]
|
||||||
func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
|
func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
|
||||||
sr.lock.RLock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.RUnlock()
|
|
||||||
|
|
||||||
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
|
if exists {
|
||||||
chain.GetFailed(validatorID, requestID)
|
chain.GetFailed(validatorID, requestID)
|
||||||
} else {
|
} else {
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
@ -250,10 +265,11 @@ func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, reques
|
||||||
// to the consensus engine working on the chain with ID [chainID]
|
// to the consensus engine working on the chain with ID [chainID]
|
||||||
func (sr *ChainRouter) GetAncestorsFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
|
func (sr *ChainRouter) GetAncestorsFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
|
||||||
sr.lock.RLock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.RUnlock()
|
|
||||||
|
|
||||||
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
|
if exists {
|
||||||
chain.GetAncestorsFailed(validatorID, requestID)
|
chain.GetAncestorsFailed(validatorID, requestID)
|
||||||
} else {
|
} else {
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
@ -264,9 +280,10 @@ func (sr *ChainRouter) GetAncestorsFailed(validatorID ids.ShortID, chainID ids.I
|
||||||
// to the consensus engine working on the chain with ID [chainID]
|
// to the consensus engine working on the chain with ID [chainID]
|
||||||
func (sr *ChainRouter) PushQuery(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
|
func (sr *ChainRouter) PushQuery(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
|
||||||
sr.lock.RLock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.RUnlock()
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
if exists {
|
||||||
chain.PushQuery(validatorID, requestID, containerID, container)
|
chain.PushQuery(validatorID, requestID, containerID, container)
|
||||||
} else {
|
} else {
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
@ -277,9 +294,10 @@ func (sr *ChainRouter) PushQuery(validatorID ids.ShortID, chainID ids.ID, reques
|
||||||
// to the consensus engine working on the chain with ID [chainID]
|
// to the consensus engine working on the chain with ID [chainID]
|
||||||
func (sr *ChainRouter) PullQuery(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
|
func (sr *ChainRouter) PullQuery(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
|
||||||
sr.lock.RLock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.RUnlock()
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
if exists {
|
||||||
chain.PullQuery(validatorID, requestID, containerID)
|
chain.PullQuery(validatorID, requestID, containerID)
|
||||||
} else {
|
} else {
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
@ -290,11 +308,13 @@ func (sr *ChainRouter) PullQuery(validatorID ids.ShortID, chainID ids.ID, reques
|
||||||
// to the consensus engine working on the chain with ID [chainID]
|
// to the consensus engine working on the chain with ID [chainID]
|
||||||
func (sr *ChainRouter) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set) {
|
func (sr *ChainRouter) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set) {
|
||||||
sr.lock.RLock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.RUnlock()
|
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
||||||
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
// Cancel timeout we set when sent the message asking for these Chits
|
// Cancel timeout we set when sent the message asking for these Chits
|
||||||
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
if exists {
|
||||||
chain.Chits(validatorID, requestID, votes)
|
chain.Chits(validatorID, requestID, votes)
|
||||||
} else {
|
} else {
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
@ -305,10 +325,11 @@ func (sr *ChainRouter) Chits(validatorID ids.ShortID, chainID ids.ID, requestID
|
||||||
// to the consensus engine working on the chain with ID [chainID]
|
// to the consensus engine working on the chain with ID [chainID]
|
||||||
func (sr *ChainRouter) QueryFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
|
func (sr *ChainRouter) QueryFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
|
||||||
sr.lock.RLock()
|
sr.lock.RLock()
|
||||||
defer sr.lock.RUnlock()
|
|
||||||
|
|
||||||
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
sr.timeouts.Cancel(validatorID, chainID, requestID)
|
||||||
if chain, exists := sr.chains[chainID.Key()]; exists {
|
chain, exists := sr.chains[chainID.Key()]
|
||||||
|
sr.lock.RUnlock()
|
||||||
|
|
||||||
|
if exists {
|
||||||
chain.QueryFailed(validatorID, requestID)
|
chain.QueryFailed(validatorID, requestID)
|
||||||
} else {
|
} else {
|
||||||
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
|
||||||
|
@ -318,14 +339,15 @@ func (sr *ChainRouter) QueryFailed(validatorID ids.ShortID, chainID ids.ID, requ
|
||||||
// Shutdown shuts down this router
|
// Shutdown shuts down this router
|
||||||
func (sr *ChainRouter) Shutdown() {
|
func (sr *ChainRouter) Shutdown() {
|
||||||
sr.lock.Lock()
|
sr.lock.Lock()
|
||||||
for _, chain := range sr.chains {
|
|
||||||
chain.Shutdown()
|
|
||||||
close(chain.msgs)
|
|
||||||
}
|
|
||||||
prevChains := sr.chains
|
prevChains := sr.chains
|
||||||
sr.chains = map[[32]byte]*Handler{}
|
sr.chains = map[[32]byte]*Handler{}
|
||||||
sr.lock.Unlock()
|
sr.lock.Unlock()
|
||||||
|
|
||||||
|
for _, chain := range prevChains {
|
||||||
|
chain.Shutdown()
|
||||||
|
close(chain.msgs)
|
||||||
|
}
|
||||||
|
|
||||||
ticker := time.NewTicker(sr.closeTimeout)
|
ticker := time.NewTicker(sr.closeTimeout)
|
||||||
timedout := false
|
timedout := false
|
||||||
for _, chain := range prevChains {
|
for _, chain := range prevChains {
|
||||||
|
@ -344,10 +366,14 @@ func (sr *ChainRouter) Shutdown() {
|
||||||
|
|
||||||
// Gossip accepted containers
|
// Gossip accepted containers
|
||||||
func (sr *ChainRouter) Gossip() {
|
func (sr *ChainRouter) Gossip() {
|
||||||
sr.lock.RLock()
|
sr.lock.Lock()
|
||||||
defer sr.lock.RUnlock()
|
chains := []*Handler{}
|
||||||
|
|
||||||
for _, chain := range sr.chains {
|
for _, chain := range sr.chains {
|
||||||
|
chains = append(chains, chain)
|
||||||
|
}
|
||||||
|
sr.lock.Unlock()
|
||||||
|
|
||||||
|
for _, chain := range chains {
|
||||||
chain.Gossip()
|
chain.Gossip()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue