From c48c198188495ea63687f11d4e366c27f23a8305 Mon Sep 17 00:00:00 2001 From: Leo Date: Sun, 24 Jan 2021 17:51:21 +0100 Subject: [PATCH] bridge: always sign lockups and store gs in aggregation state This fixes #160 by making sure that nodes will sign *any* lockup they see using their guardian key, and storing the gs in the aggregation state when we see a lockup. --- bridge/pkg/processor/broadcast.go | 1 + bridge/pkg/processor/injection.go | 13 +------ bridge/pkg/processor/lockup.go | 17 +-------- bridge/pkg/processor/observation.go | 53 ++++++++++++++++++++++------- bridge/pkg/processor/processor.go | 1 + 5 files changed, 44 insertions(+), 41 deletions(-) diff --git a/bridge/pkg/processor/broadcast.go b/bridge/pkg/processor/broadcast.go index 6ef1fa960..36271c0f0 100644 --- a/bridge/pkg/processor/broadcast.go +++ b/bridge/pkg/processor/broadcast.go @@ -58,6 +58,7 @@ func (p *Processor) broadcastSignature(v *vaa.VAA, signature []byte) { p.state.vaaSignatures[hash].ourVAA = v p.state.vaaSignatures[hash].ourMsg = msg + p.state.vaaSignatures[hash].gs = p.gs // guaranteed to match ourVAA - there's no concurrent access to p.gs // Fast path for our own signature go func() { p.obsvC <- &obsv }() diff --git a/bridge/pkg/processor/injection.go b/bridge/pkg/processor/injection.go index eb67ef361..b20b01e38 100644 --- a/bridge/pkg/processor/injection.go +++ b/bridge/pkg/processor/injection.go @@ -26,16 +26,6 @@ func init() { // handleInjection processes a pre-populated VAA injected locally. func (p *Processor) handleInjection(ctx context.Context, v *vaa.VAA) { - // Check if we're in the guardian set. - us, ok := p.gs.KeyIndex(p.ourAddr) - if !ok { - p.logger.Error("we're not in the guardian set - refusing to sign", - zap.Uint32("index", p.gs.Index), - zap.Stringer("our_addr", p.ourAddr), - zap.Any("set", p.gs.KeysAsHexStrings())) - return - } - // Generate digest of the unsigned VAA. digest, err := v.SigningMsg() if err != nil { @@ -54,8 +44,7 @@ func (p *Processor) handleInjection(ctx context.Context, v *vaa.VAA) { p.logger.Info("observed and signed injected VAA", zap.String("digest", hex.EncodeToString(digest.Bytes())), - zap.String("signature", hex.EncodeToString(s)), - zap.Int("our_index", us)) + zap.String("signature", hex.EncodeToString(s))) vaaInjectionsTotal.Inc() p.broadcastSignature(v, s) diff --git a/bridge/pkg/processor/lockup.go b/bridge/pkg/processor/lockup.go index db85794aa..2568b775d 100644 --- a/bridge/pkg/processor/lockup.go +++ b/bridge/pkg/processor/lockup.go @@ -56,20 +56,6 @@ func (p *Processor) handleLockup(ctx context.Context, k *common.ChainLock) { "source_chain": k.SourceChain.String(), "target_chain": k.TargetChain.String()}).Add(1) - if p.gs == nil { - p.logger.Warn("received observation, but we don't know the guardian set yet") - return - } - - us, ok := p.gs.KeyIndex(p.ourAddr) - if !ok { - p.logger.Error("we're not in the guardian set - refusing to sign", - zap.Uint32("index", p.gs.Index), - zap.Stringer("our_addr", p.ourAddr), - zap.Any("set", p.gs.KeysAsHexStrings())) - return - } - // All nodes will create the exact same VAA and sign its digest. // Consensus is established on this digest. @@ -110,8 +96,7 @@ func (p *Processor) handleLockup(ctx context.Context, k *common.ChainLock) { zap.Stringer("target_chain", k.TargetChain), zap.Stringer("txhash", k.TxHash), zap.String("digest", hex.EncodeToString(digest.Bytes())), - zap.String("signature", hex.EncodeToString(s)), - zap.Int("our_index", us)) + zap.String("signature", hex.EncodeToString(s))) lockupsSignedTotal.With(prometheus.Labels{ "source_chain": k.SourceChain.String(), diff --git a/bridge/pkg/processor/observation.go b/bridge/pkg/processor/observation.go index b97276512..d99947b34 100644 --- a/bridge/pkg/processor/observation.go +++ b/bridge/pkg/processor/observation.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "fmt" + bridge_common "github.com/certusone/wormhole/bridge/pkg/common" "github.com/prometheus/client_golang/prometheus" "strings" "time" @@ -69,8 +70,10 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs // Note that observations are never tied to the (verified) p2p identity key - the p2p network // identity is completely decoupled from the guardian identity, p2p is just transport. + hash := hex.EncodeToString(m.Hash) + p.logger.Info("received observation", - zap.String("digest", hex.EncodeToString(m.Hash)), + zap.String("digest", hash), zap.String("signature", hex.EncodeToString(m.Signature)), zap.String("addr", hex.EncodeToString(m.Addr))) @@ -81,7 +84,7 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs pk, err := crypto.Ecrecover(m.Hash, m.Signature) if err != nil { p.logger.Warn("failed to verify signature on observation", - zap.String("digest", hex.EncodeToString(m.Hash)), + zap.String("digest", hash), zap.String("signature", hex.EncodeToString(m.Signature)), zap.String("addr", hex.EncodeToString(m.Addr)), zap.Error(err)) @@ -95,7 +98,7 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs if their_addr != signer_pk { p.logger.Info("invalid observation - address does not match pubkey", - zap.String("digest", hex.EncodeToString(m.Hash)), + zap.String("digest", hash), zap.String("signature", hex.EncodeToString(m.Signature)), zap.String("addr", hex.EncodeToString(m.Addr)), zap.String("pk", signer_pk.Hex())) @@ -103,12 +106,38 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs return } - // Verify that m.Addr is included in the current guardian set. - _, ok := p.gs.KeyIndex(their_addr) + // Determine which guardian set to use. The following cases are possible: + // + // - We have already seen the lockup and generated ourVAA. In this case, use the guardian set valid at the time, + // even if the guardian set was updated. Old guardian sets remain valid for longer than aggregation state, + // and the guardians in the old set stay online and observe and sign lockup for the transition period. + // + // - We have not yet seen the lockup. In this case, we assume the latest guardian set because that's what + // we will store once we do see the lockup. + // + // This ensures that during a guardian set update, a node which observed a given lockup with either the old + // or the new guardian set can achieve consensus, since both the old and the new set would achieve consensus, + // assuming that 2/3+ of the old and the new guardian set have seen the lockup and will periodically attempt + // to retransmit their observations such that nodes who initially dropped the signature will get a 2nd chance. + // + // During an update, vaaState.signatures can contain signatures from *both* guardian sets. + // + var gs *bridge_common.GuardianSet + if p.state.vaaSignatures[hash] != nil && p.state.vaaSignatures[hash].gs != nil { + gs = p.state.vaaSignatures[hash].gs + } else { + gs = p.gs + } + + // Verify that m.Addr is included in the guardian set. If it's not, drop the message. In case it's us + // who have the outdated guardian set, we'll just wait for the message to be retransmitted eventually. + _, ok := gs.KeyIndex(their_addr) if !ok { p.logger.Warn("received observation by unknown guardian - is our guardian set outdated?", + zap.String("digest", their_addr.Hex()), zap.String("their_addr", their_addr.Hex()), - zap.Any("current_set", p.gs.KeysAsHexStrings()), + zap.Uint32("index", gs.Index), + zap.Any("keys", gs.KeysAsHexStrings()), ) observationsFailedTotal.WithLabelValues("unknown_guardian").Inc() return @@ -123,8 +152,6 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs observationsReceivedByGuardianAddressTotal.WithLabelValues(their_addr.Hex()).Inc() // []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging. - hash := hex.EncodeToString(m.Hash) - if p.state.vaaSignatures[hash] == nil { // We haven't yet seen this event ourselves, and therefore do not know what the VAA looks like. // However, we have established that a valid guardian has signed it, and therefore we can @@ -145,9 +172,9 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs p.state.vaaSignatures[hash].signatures[their_addr] = m.Signature // Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA. - agg := make([]bool, len(p.gs.Keys)) + agg := make([]bool, len(gs.Keys)) var sigs []*vaa.Signature - for i, a := range p.gs.Keys { + for i, a := range gs.Keys { s, ok := p.state.vaaSignatures[hash].signatures[a] if ok { @@ -178,12 +205,12 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs } // 2/3+ majority required for VAA to be valid - wait until we have quorum to submit VAA. - quorum := CalculateQuorum(len(p.gs.Keys)) + quorum := CalculateQuorum(len(gs.Keys)) p.logger.Info("aggregation state for VAA", zap.String("digest", hash), - zap.Any("set", p.gs.KeysAsHexStrings()), - zap.Uint32("index", p.gs.Index), + zap.Any("set", gs.KeysAsHexStrings()), + zap.Uint32("index", gs.Index), zap.Bools("aggregation", agg), zap.Int("required_sigs", quorum), zap.Int("have_sigs", len(sigs)), diff --git a/bridge/pkg/processor/processor.go b/bridge/pkg/processor/processor.go index 7ae6813cd..0176a60e5 100644 --- a/bridge/pkg/processor/processor.go +++ b/bridge/pkg/processor/processor.go @@ -27,6 +27,7 @@ type ( submitted bool retryCount uint ourMsg []byte + gs *common.GuardianSet } vaaMap map[string]*vaaState