From 367b5c30db97cebf7db402896e656ab5ceaee8e4 Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Mon, 8 Jun 2020 04:05:50 -0400 Subject: [PATCH] Removed potential deadlock in registering a new chain --- snow/networking/router/handler.go | 3 + snow/networking/router/subnet_router.go | 156 ++++++++++++++---------- 2 files changed, 94 insertions(+), 65 deletions(-) diff --git a/snow/networking/router/handler.go b/snow/networking/router/handler.go index c4c03f0..fafb62a 100644 --- a/snow/networking/router/handler.go +++ b/snow/networking/router/handler.go @@ -246,6 +246,7 @@ func (h *Handler) Get(validatorID ids.ShortID, requestID uint32, containerID ids // GetAncestors passes a GetAncestors message received from the network to the consensus engine. func (h *Handler) GetAncestors(validatorID ids.ShortID, requestID uint32, containerID ids.ID) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: getAncestorsMsg, validatorID: validatorID, @@ -268,6 +269,7 @@ func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids // MultiPut passes a MultiPut message received from the network to the consensus engine. func (h *Handler) MultiPut(validatorID ids.ShortID, requestID uint32, containers [][]byte) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: multiPutMsg, validatorID: validatorID, @@ -288,6 +290,7 @@ func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) { // GetAncestorsFailed passes a GetAncestorsFailed message to the consensus engine. func (h *Handler) GetAncestorsFailed(validatorID ids.ShortID, requestID uint32) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: getAncestorsFailedMsg, validatorID: validatorID, diff --git a/snow/networking/router/subnet_router.go b/snow/networking/router/subnet_router.go index 26b02af..be58910 100644 --- a/snow/networking/router/subnet_router.go +++ b/snow/networking/router/subnet_router.go @@ -64,25 +64,29 @@ func (sr *ChainRouter) AddChain(chain *Handler) { // RemoveChain removes the specified chain so that incoming // messages can't be routed to it func (sr *ChainRouter) RemoveChain(chainID ids.ID) { - sr.lock.Lock() - defer sr.lock.Unlock() + sr.lock.RLock() + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() - if chain, exists := sr.chains[chainID.Key()]; exists { - chain.Shutdown() - close(chain.msgs) - - ticker := time.NewTicker(sr.closeTimeout) - select { - case _, _ = <-chain.closed: - case <-ticker.C: - chain.Context().Log.Warn("timed out while shutting down") - } - ticker.Stop() - - delete(sr.chains, chainID.Key()) - } else { + if !exists { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) + return } + + chain.Shutdown() + close(chain.msgs) + + ticker := time.NewTicker(sr.closeTimeout) + select { + case _, _ = <-chain.closed: + case <-ticker.C: + chain.Context().Log.Warn("timed out while shutting down") + } + ticker.Stop() + + sr.lock.Lock() + delete(sr.chains, chainID.Key()) + sr.lock.Unlock() } // GetAcceptedFrontier routes an incoming GetAcceptedFrontier request from the @@ -90,9 +94,10 @@ func (sr *ChainRouter) RemoveChain(chainID ids.ID) { // chain with ID [chainID] func (sr *ChainRouter) GetAcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requestID uint32) { sr.lock.RLock() - defer sr.lock.RUnlock() + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() - if chain, exists := sr.chains[chainID.Key()]; exists { + if exists { chain.GetAcceptedFrontier(validatorID, requestID) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) @@ -104,10 +109,11 @@ func (sr *ChainRouter) GetAcceptedFrontier(validatorID ids.ShortID, chainID ids. // chain with ID [chainID] func (sr *ChainRouter) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) { sr.lock.RLock() - defer sr.lock.RUnlock() - sr.timeouts.Cancel(validatorID, chainID, requestID) - if chain, exists := sr.chains[chainID.Key()]; exists { + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() + + if exists { chain.AcceptedFrontier(validatorID, requestID, containerIDs) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) @@ -119,10 +125,11 @@ func (sr *ChainRouter) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, // working on the chain with ID [chainID] func (sr *ChainRouter) GetAcceptedFrontierFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) { sr.lock.RLock() - defer sr.lock.RUnlock() - sr.timeouts.Cancel(validatorID, chainID, requestID) - if chain, exists := sr.chains[chainID.Key()]; exists { + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() + + if exists { chain.GetAcceptedFrontierFailed(validatorID, requestID) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) @@ -134,9 +141,10 @@ func (sr *ChainRouter) GetAcceptedFrontierFailed(validatorID ids.ShortID, chainI // chain with ID [chainID] func (sr *ChainRouter) GetAccepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) { sr.lock.RLock() - defer sr.lock.RUnlock() + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() - if chain, exists := sr.chains[chainID.Key()]; exists { + if exists { chain.GetAccepted(validatorID, requestID, containerIDs) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) @@ -148,10 +156,11 @@ func (sr *ChainRouter) GetAccepted(validatorID ids.ShortID, chainID ids.ID, requ // [chainID] func (sr *ChainRouter) Accepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) { sr.lock.RLock() - defer sr.lock.RUnlock() - sr.timeouts.Cancel(validatorID, chainID, requestID) - if chain, exists := sr.chains[chainID.Key()]; exists { + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() + + if exists { chain.Accepted(validatorID, requestID, containerIDs) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) @@ -163,10 +172,11 @@ func (sr *ChainRouter) Accepted(validatorID ids.ShortID, chainID ids.ID, request // chain with ID [chainID] func (sr *ChainRouter) GetAcceptedFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) { sr.lock.RLock() - defer sr.lock.RUnlock() - sr.timeouts.Cancel(validatorID, chainID, requestID) - if chain, exists := sr.chains[chainID.Key()]; exists { + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() + + if exists { chain.GetAcceptedFailed(validatorID, requestID) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) @@ -177,9 +187,10 @@ func (sr *ChainRouter) GetAcceptedFailed(validatorID ids.ShortID, chainID ids.ID // to the consensus engine working on the chain with ID [chainID] func (sr *ChainRouter) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) { sr.lock.RLock() - defer sr.lock.RUnlock() + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() - if chain, exists := sr.chains[chainID.Key()]; exists { + if exists { chain.Get(validatorID, requestID, containerID) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) @@ -191,9 +202,10 @@ func (sr *ChainRouter) Get(validatorID ids.ShortID, chainID ids.ID, requestID ui // The maximum number of ancestors to respond with is define in snow/engine/commong/bootstrapper.go func (sr *ChainRouter) GetAncestors(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) { sr.lock.RLock() - defer sr.lock.RUnlock() + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() - if chain, exists := sr.chains[chainID.Key()]; exists { + if exists { chain.GetAncestors(validatorID, requestID, containerID) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) @@ -204,12 +216,13 @@ func (sr *ChainRouter) GetAncestors(validatorID ids.ShortID, chainID ids.ID, req // to the consensus engine working on the chain with ID [chainID] func (sr *ChainRouter) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) { sr.lock.RLock() - defer sr.lock.RUnlock() + sr.timeouts.Cancel(validatorID, chainID, requestID) + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() // This message came in response to a Get message from this node, and when we sent that Get // message we set a timeout. Since we got a response, cancel the timeout. - sr.timeouts.Cancel(validatorID, chainID, requestID) - if chain, exists := sr.chains[chainID.Key()]; exists { + if exists { chain.Put(validatorID, requestID, containerID, container) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) @@ -220,12 +233,13 @@ func (sr *ChainRouter) Put(validatorID ids.ShortID, chainID ids.ID, requestID ui // to the consensus engine working on the chain with ID [chainID] func (sr *ChainRouter) MultiPut(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containers [][]byte) { sr.lock.RLock() - defer sr.lock.RUnlock() + sr.timeouts.Cancel(validatorID, chainID, requestID) + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() // This message came in response to a GetAncestors message from this node, and when we sent that // message we set a timeout. Since we got a response, cancel the timeout. - sr.timeouts.Cancel(validatorID, chainID, requestID) - if chain, exists := sr.chains[chainID.Key()]; exists { + if exists { chain.MultiPut(validatorID, requestID, containers) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) @@ -236,10 +250,11 @@ func (sr *ChainRouter) MultiPut(validatorID ids.ShortID, chainID ids.ID, request // to the consensus engine working on the chain with ID [chainID] func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) { sr.lock.RLock() - defer sr.lock.RUnlock() - sr.timeouts.Cancel(validatorID, chainID, requestID) - if chain, exists := sr.chains[chainID.Key()]; exists { + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() + + if exists { chain.GetFailed(validatorID, requestID) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) @@ -250,10 +265,11 @@ func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, reques // to the consensus engine working on the chain with ID [chainID] func (sr *ChainRouter) GetAncestorsFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) { sr.lock.RLock() - defer sr.lock.RUnlock() - sr.timeouts.Cancel(validatorID, chainID, requestID) - if chain, exists := sr.chains[chainID.Key()]; exists { + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() + + if exists { chain.GetAncestorsFailed(validatorID, requestID) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) @@ -264,9 +280,10 @@ func (sr *ChainRouter) GetAncestorsFailed(validatorID ids.ShortID, chainID ids.I // to the consensus engine working on the chain with ID [chainID] func (sr *ChainRouter) PushQuery(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) { sr.lock.RLock() - defer sr.lock.RUnlock() + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() - if chain, exists := sr.chains[chainID.Key()]; exists { + if exists { chain.PushQuery(validatorID, requestID, containerID, container) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) @@ -277,9 +294,10 @@ func (sr *ChainRouter) PushQuery(validatorID ids.ShortID, chainID ids.ID, reques // to the consensus engine working on the chain with ID [chainID] func (sr *ChainRouter) PullQuery(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) { sr.lock.RLock() - defer sr.lock.RUnlock() + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() - if chain, exists := sr.chains[chainID.Key()]; exists { + if exists { chain.PullQuery(validatorID, requestID, containerID) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) @@ -290,11 +308,13 @@ func (sr *ChainRouter) PullQuery(validatorID ids.ShortID, chainID ids.ID, reques // to the consensus engine working on the chain with ID [chainID] func (sr *ChainRouter) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set) { sr.lock.RLock() - defer sr.lock.RUnlock() + sr.timeouts.Cancel(validatorID, chainID, requestID) + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() // Cancel timeout we set when sent the message asking for these Chits - sr.timeouts.Cancel(validatorID, chainID, requestID) - if chain, exists := sr.chains[chainID.Key()]; exists { + + if exists { chain.Chits(validatorID, requestID, votes) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) @@ -305,10 +325,11 @@ func (sr *ChainRouter) Chits(validatorID ids.ShortID, chainID ids.ID, requestID // to the consensus engine working on the chain with ID [chainID] func (sr *ChainRouter) QueryFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) { sr.lock.RLock() - defer sr.lock.RUnlock() - sr.timeouts.Cancel(validatorID, chainID, requestID) - if chain, exists := sr.chains[chainID.Key()]; exists { + chain, exists := sr.chains[chainID.Key()] + sr.lock.RUnlock() + + if exists { chain.QueryFailed(validatorID, requestID) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) @@ -318,14 +339,15 @@ func (sr *ChainRouter) QueryFailed(validatorID ids.ShortID, chainID ids.ID, requ // Shutdown shuts down this router func (sr *ChainRouter) Shutdown() { sr.lock.Lock() - for _, chain := range sr.chains { - chain.Shutdown() - close(chain.msgs) - } prevChains := sr.chains sr.chains = map[[32]byte]*Handler{} sr.lock.Unlock() + for _, chain := range prevChains { + chain.Shutdown() + close(chain.msgs) + } + ticker := time.NewTicker(sr.closeTimeout) timedout := false for _, chain := range prevChains { @@ -344,10 +366,14 @@ func (sr *ChainRouter) Shutdown() { // Gossip accepted containers func (sr *ChainRouter) Gossip() { - sr.lock.RLock() - defer sr.lock.RUnlock() - + sr.lock.Lock() + chains := []*Handler{} for _, chain := range sr.chains { + chains = append(chains, chain) + } + sr.lock.Unlock() + + for _, chain := range chains { chain.Gossip() } }