diff --git a/node/hack/discord_test/discord.go b/node/hack/discord_test/discord.go index eeeda8559..c94f0a202 100644 --- a/node/hack/discord_test/discord.go +++ b/node/hack/discord_test/discord.go @@ -47,12 +47,12 @@ func main() { logger.Fatal("failed to initialize notifier", zap.Error(err)) } - if err := d.MissingSignaturesOnTransaction(v, 14, 13, true, []string{ + if err := d.MissingSignaturesOnObservation(v, 14, 13, true, []string{ "Certus One", "Not Certus One"}); err != nil { logger.Fatal("failed to send test message", zap.Error(err)) } - if err := d.MissingSignaturesOnTransaction(v, 14, 13, true, []string{ + if err := d.MissingSignaturesOnObservation(v, 14, 13, true, []string{ "Certus One"}); err != nil { logger.Fatal("failed to send test message", zap.Error(err)) } diff --git a/node/pkg/notify/discord/notify.go b/node/pkg/notify/discord/notify.go index 9f0c16853..0e923ed1c 100644 --- a/node/pkg/notify/discord/notify.go +++ b/node/pkg/notify/discord/notify.go @@ -2,6 +2,7 @@ package discord import ( "bytes" + "encoding/hex" "fmt" "strings" "sync" @@ -9,6 +10,7 @@ import ( "github.com/certusone/wormhole/node/pkg/vaa" "github.com/diamondburned/arikawa/v3/api" "github.com/diamondburned/arikawa/v3/discord" + "github.com/ethereum/go-ethereum/common" "go.uber.org/zap" ) @@ -96,7 +98,15 @@ func (d *DiscordNotifier) LookupGroupID(groupName string) (string, error) { return "", fmt.Errorf("failed to find group %s", groupName) } -func (d *DiscordNotifier) MissingSignaturesOnTransaction(v *vaa.VAA, hasSigs, wantSigs int, quorum bool, missing []string) error { +// Observation defines the same interface as processor.Observation but redefined +// here to avoid circular dependencies. +type Observation interface { + GetEmitterChain() vaa.ChainID + MessageID() string + SigningMsg() common.Hash +} + +func (d *DiscordNotifier) MissingSignaturesOnObservation(o Observation, hasSigs, wantSigs int, quorum bool, missing []string) error { if len(missing) == 0 { panic("no missing nodes specified") } @@ -132,10 +142,10 @@ func (d *DiscordNotifier) MissingSignaturesOnTransaction(v *vaa.VAA, hasSigs, wa discord.Embed{ Title: "Message with missing signatures", Fields: []discord.EmbedField{ - {Name: "Message ID", Value: wrapCode(v.MessageID()), Inline: true}, - {Name: "Digest", Value: wrapCode(v.HexDigest()), Inline: true}, + {Name: "Message ID", Value: wrapCode(o.MessageID()), Inline: true}, + {Name: "Digest", Value: wrapCode(hex.EncodeToString(o.SigningMsg().Bytes())), Inline: true}, {Name: "Quorum", Value: quorumText, Inline: true}, - {Name: "Source Chain", Value: strings.Title(v.EmitterChain.String()), Inline: false}, + {Name: "Source Chain", Value: strings.Title(o.GetEmitterChain().String()), Inline: false}, {Name: "Missing Guardians", Value: missingText.String(), Inline: false}, }, }, diff --git a/node/pkg/processor/broadcast.go b/node/pkg/processor/broadcast.go index d2b532e03..0247303f7 100644 --- a/node/pkg/processor/broadcast.go +++ b/node/pkg/processor/broadcast.go @@ -23,15 +23,18 @@ var ( }) ) -func (p *Processor) broadcastSignature(v *vaa.VAA, signature []byte, txhash []byte) { - digest := v.SigningMsg() - +func (p *Processor) broadcastSignature( + o Observation, + signature []byte, + txhash []byte, +) { + digest := o.SigningMsg() obsv := gossipv1.SignedObservation{ Addr: crypto.PubkeyToAddress(p.gk.PublicKey).Bytes(), Hash: digest.Bytes(), Signature: signature, TxHash: txhash, - MessageId: v.MessageID(), + MessageId: o.MessageID(), } w := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedObservation{SignedObservation: &obsv}} @@ -46,18 +49,18 @@ func (p *Processor) broadcastSignature(v *vaa.VAA, signature []byte, txhash []by // Store our VAA in case we're going to submit it to Solana hash := hex.EncodeToString(digest.Bytes()) - if p.state.vaaSignatures[hash] == nil { - p.state.vaaSignatures[hash] = &vaaState{ + if p.state.signatures[hash] == nil { + p.state.signatures[hash] = &state{ firstObserved: time.Now(), signatures: map[ethcommon.Address][]byte{}, source: "loopback", } } - p.state.vaaSignatures[hash].ourVAA = v - p.state.vaaSignatures[hash].ourMsg = msg - p.state.vaaSignatures[hash].source = v.EmitterChain.String() - p.state.vaaSignatures[hash].gs = p.gs // guaranteed to match ourVAA - there's no concurrent access to p.gs + p.state.signatures[hash].ourObservation = o + p.state.signatures[hash].ourMsg = msg + p.state.signatures[hash].source = o.GetEmitterChain().String() + p.state.signatures[hash].gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs // Fast path for our own signature go func() { p.obsvC <- &obsv }() diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index 69385daf1..a75494a9d 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -7,6 +7,7 @@ import ( "github.com/certusone/wormhole/node/pkg/common" "github.com/certusone/wormhole/node/pkg/db" + "github.com/certusone/wormhole/node/pkg/notify/discord" "github.com/certusone/wormhole/node/pkg/vaa" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -56,45 +57,46 @@ const ( settlementTime = time.Second * 30 ) -// handleCleanup handles periodic retransmissions and cleanup of VAAs +// handleCleanup handles periodic retransmissions and cleanup of observations func (p *Processor) handleCleanup(ctx context.Context) { - p.logger.Info("aggregation state summary", zap.Int("cached", len(p.state.vaaSignatures))) - aggregationStateEntries.Set(float64(len(p.state.vaaSignatures))) + p.logger.Info("aggregation state summary", zap.Int("cached", len(p.state.signatures))) + aggregationStateEntries.Set(float64(len(p.state.signatures))) - for hash, s := range p.state.vaaSignatures { + for hash, s := range p.state.signatures { delta := time.Since(s.firstObserved) switch { - case !s.submitted && s.ourVAA != nil && delta > settlementTime: + case !s.submitted && s.ourObservation != nil && delta > settlementTime: // Expire pending VAAs post settlement time if we have a stored quorum VAA. // // This occurs when we observed a message after the cluster has already reached // consensus on it, causing us to never achieve quorum. - - if _, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(s.ourVAA)); err == nil { - // If we have a stored quorum VAA, we can safely expire the state. - // - // This is a rare case, and we can safely expire the state, since we - // have a quorum VAA. - p.logger.Info("Expiring late VAA", zap.String("digest", hash), zap.Duration("delta", delta)) - aggregationStateLate.Inc() - delete(p.state.vaaSignatures, hash) - break - } else if err != db.ErrVAANotFound { - p.logger.Error("failed to look up VAA in database", - zap.String("digest", hash), - zap.Error(err), - ) + if ourVaa, ok := s.ourObservation.(*VAA); ok { + if _, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(&ourVaa.VAA)); err == nil { + // If we have a stored quorum VAA, we can safely expire the state. + // + // This is a rare case, and we can safely expire the state, since we + // have a quorum VAA. + p.logger.Info("Expiring late VAA", zap.String("digest", hash), zap.Duration("delta", delta)) + aggregationStateLate.Inc() + delete(p.state.signatures, hash) + break + } else if err != db.ErrVAANotFound { + p.logger.Error("failed to look up VAA in database", + zap.String("digest", hash), + zap.Error(err), + ) + } } - fallthrough + case !s.settled && delta > settlementTime: - // After 30 seconds, the VAA is considered settled - it's unlikely that more observations will + // After 30 seconds, the observation is considered settled - it's unlikely that more observations will // arrive, barring special circumstances. This is a better time to count misses than submission, // because we submit right when we quorum rather than waiting for all observations to arrive. s.settled = true - // Use either the most recent (in case of a VAA we haven't seen) or stored gs, if available. + // Use either the most recent (in case of a observation we haven't seen) or stored gs, if available. var gs *common.GuardianSet if s.gs != nil { gs = s.gs @@ -107,12 +109,12 @@ func (p *Processor) handleCleanup(ctx context.Context) { quorum := hasSigs >= wantSigs var chain vaa.ChainID - if s.ourVAA != nil { - chain = s.ourVAA.EmitterChain + if s.ourObservation != nil { + chain = s.ourObservation.GetEmitterChain() // If a notifier is configured, send a notification for any missing signatures. // - // Only send a notification if we have a VAA. Otherwise, bogus observations + // Only send a notification if we have a observation. Otherwise, bogus observations // could cause invalid alerts. if p.notifier != nil && hasSigs < len(gs.Keys) { p.logger.Info("sending miss notification", zap.String("digest", hash)) @@ -134,16 +136,16 @@ func (p *Processor) handleCleanup(ctx context.Context) { // Send notification for individual message when quorum has failed or // more than one node is missing. if !quorum || len(missing) > 1 { - go func(v *vaa.VAA, hasSigs, wantSigs int, quorum bool, missing []string) { - if err := p.notifier.MissingSignaturesOnTransaction(v, hasSigs, wantSigs, quorum, missing); err != nil { + go func(o discord.Observation, hasSigs, wantSigs int, quorum bool, missing []string) { + if err := p.notifier.MissingSignaturesOnObservation(o, hasSigs, wantSigs, quorum, missing); err != nil { p.logger.Error("failed to send notification", zap.Error(err)) } - }(s.ourVAA, hasSigs, wantSigs, quorum, missing) + }(s.ourObservation, hasSigs, wantSigs, quorum, missing) } } } - p.logger.Info("VAA considered settled", + p.logger.Info("observation considered settled", zap.String("digest", hash), zap.Duration("delta", delta), zap.Int("have_sigs", hasSigs), @@ -160,26 +162,26 @@ func (p *Processor) handleCleanup(ctx context.Context) { } } case s.submitted && delta.Hours() >= 1: - // We could delete submitted VAAs right away, but then we'd lose context about additional (late) + // We could delete submitted observations right away, but then we'd lose context about additional (late) // observation that come in. Therefore, keep it for a reasonable amount of time. // If a very late observation arrives after cleanup, a nil aggregation state will be created // and then expired after a while (as noted in observation.go, this can be abused by a byzantine guardian). - p.logger.Info("expiring submitted VAA", zap.String("digest", hash), zap.Duration("delta", delta)) - delete(p.state.vaaSignatures, hash) + p.logger.Info("expiring submitted observation", zap.String("digest", hash), zap.Duration("delta", delta)) + delete(p.state.signatures, hash) aggregationStateExpiration.Inc() case !s.submitted && ((s.ourMsg != nil && s.retryCount >= 14400 /* 120 hours */) || (s.ourMsg == nil && s.retryCount >= 10 /* 5 minutes */)): // Clearly, this horse is dead and continued beatings won't bring it closer to quorum. - p.logger.Info("expiring unsubmitted VAA after exhausting retries", zap.String("digest", hash), zap.Duration("delta", delta)) - delete(p.state.vaaSignatures, hash) + p.logger.Info("expiring unsubmitted observation after exhausting retries", zap.String("digest", hash), zap.Duration("delta", delta)) + delete(p.state.signatures, hash) aggregationStateTimeout.Inc() case !s.submitted && delta.Minutes() >= 5: - // Poor VAA has been unsubmitted for five minutes - clearly, something went wrong. + // Poor observation has been unsubmitted for five minutes - clearly, something went wrong. // If we have previously submitted an observation, we can make another attempt to get it over - // the finish line by rebroadcasting our sig. If we do not have a VAA, it means we either never observed it, + // the finish line by rebroadcasting our sig. If we do not have a observation, it means we either never observed it, // or it got revived by a malfunctioning guardian node, in which case, we can't do anything // about it and just delete it to keep our state nice and lean. if s.ourMsg != nil { - p.logger.Info("resubmitting VAA observation", + p.logger.Info("resubmitting observation", zap.String("digest", hash), zap.Duration("delta", delta), zap.Uint("retry", s.retryCount)) @@ -193,14 +195,14 @@ func (p *Processor) handleCleanup(ctx context.Context) { hasSigs := len(s.signatures) wantSigs := CalculateQuorum(len(p.gs.Keys)) - p.logger.Info("expiring unsubmitted nil VAA", + p.logger.Info("expiring unsubmitted nil observation", zap.String("digest", hash), zap.Duration("delta", delta), zap.Int("have_sigs", hasSigs), zap.Int("required_sigs", wantSigs), zap.Bool("quorum", hasSigs >= wantSigs), ) - delete(p.state.vaaSignatures, hash) + delete(p.state.signatures, hash) aggregationStateUnobserved.Inc() } } diff --git a/node/pkg/processor/injection.go b/node/pkg/processor/injection.go index afc387442..7e5df5d17 100644 --- a/node/pkg/processor/injection.go +++ b/node/pkg/processor/injection.go @@ -42,5 +42,5 @@ func (p *Processor) handleInjection(ctx context.Context, v *vaa.VAA) { zap.String("signature", hex.EncodeToString(s))) vaaInjectionsTotal.Inc() - p.broadcastSignature(v, s, nil) + p.broadcastSignature(&VAA{VAA: *v}, s, nil) } diff --git a/node/pkg/processor/message.go b/node/pkg/processor/message.go index f679c543d..706bcf2ad 100644 --- a/node/pkg/processor/message.go +++ b/node/pkg/processor/message.go @@ -67,17 +67,19 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat // All nodes will create the exact same VAA and sign its digest. // Consensus is established on this digest. - v := &vaa.VAA{ - Version: vaa.SupportedVAAVersion, - GuardianSetIndex: p.gs.Index, - Signatures: nil, - Timestamp: k.Timestamp, - Nonce: k.Nonce, - EmitterChain: k.EmitterChain, - EmitterAddress: k.EmitterAddress, - Payload: k.Payload, - Sequence: k.Sequence, - ConsistencyLevel: k.ConsistencyLevel, + v := &VAA{ + VAA: vaa.VAA{ + Version: vaa.SupportedVAAVersion, + GuardianSetIndex: p.gs.Index, + Signatures: nil, + Timestamp: k.Timestamp, + Nonce: k.Nonce, + EmitterChain: k.EmitterChain, + EmitterAddress: k.EmitterAddress, + Payload: k.Payload, + Sequence: k.Sequence, + ConsistencyLevel: k.ConsistencyLevel, + }, } // A governance message should never be emitted on-chain @@ -98,7 +100,7 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat // // Exception: if an observation is made within the settlement time (30s), we'll // process it so other nodes won't consider it a miss. - if vb, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(v)); err == nil { + if vb, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(&v.VAA)); err == nil { // unmarshal vaa var existing *vaa.VAA if existing, err = vaa.Unmarshal(vb); err != nil { @@ -156,7 +158,7 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat messagesSignedTotal.With(prometheus.Labels{ "emitter_chain": k.EmitterChain.String()}).Add(1) - p.attestationEvents.ReportMessagePublication(&reporter.MessagePublication{VAA: *v, InitiatingTxID: k.TxHash}) + p.attestationEvents.ReportMessagePublication(&reporter.MessagePublication{VAA: v.VAA, InitiatingTxID: k.TxHash}) p.broadcastSignature(v, s, k.TxHash.Bytes()) } diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index edb6c3389..fcb9b383e 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -93,7 +93,7 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs // Determine which guardian set to use. The following cases are possible: // - // - We have already seen the message and generated ourVAA. In this case, use the guardian set valid at the time, + // - We have already seen the message and generated ourObservation. In this case, use the guardian set valid at the time, // even if the guardian set was updated. Old guardian sets remain valid for longer than aggregation state, // and the guardians in the old set stay online and observe and sign messages for the transition period. // @@ -108,8 +108,8 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs // During an update, vaaState.signatures can contain signatures from *both* guardian sets. // var gs *node_common.GuardianSet - if p.state.vaaSignatures[hash] != nil && p.state.vaaSignatures[hash].gs != nil { - gs = p.state.vaaSignatures[hash].gs + if p.state.signatures[hash] != nil && p.state.signatures[hash].gs != nil { + gs = p.state.signatures[hash].gs } else { gs = p.gs } @@ -147,7 +147,7 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs observationsReceivedByGuardianAddressTotal.WithLabelValues(their_addr.Hex()).Inc() // []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging. - if p.state.vaaSignatures[hash] == nil { + if p.state.signatures[hash] == nil { // We haven't yet seen this event ourselves, and therefore do not know what the VAA looks like. // However, we have established that a valid guardian has signed it, and therefore we can // already start aggregating signatures for it. @@ -158,20 +158,20 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs observationsUnknownTotal.Inc() - p.state.vaaSignatures[hash] = &vaaState{ + p.state.signatures[hash] = &state{ firstObserved: time.Now(), signatures: map[common.Address][]byte{}, source: "unknown", } } - p.state.vaaSignatures[hash].signatures[their_addr] = m.Signature + p.state.signatures[hash].signatures[their_addr] = m.Signature // Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA. agg := make([]bool, len(gs.Keys)) var sigs []*vaa.Signature for i, a := range gs.Keys { - s, ok := p.state.vaaSignatures[hash].signatures[a] + s, ok := p.state.signatures[hash].signatures[a] if ok { var bs [65]byte @@ -188,27 +188,13 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs agg[i] = ok } - if p.state.vaaSignatures[hash].ourVAA != nil { - // We have seen it on chain! - // Deep copy the VAA and add signatures - v := p.state.vaaSignatures[hash].ourVAA - signed := &vaa.VAA{ - Version: v.Version, - GuardianSetIndex: v.GuardianSetIndex, - Signatures: sigs, - Timestamp: v.Timestamp, - Nonce: v.Nonce, - Sequence: v.Sequence, - EmitterChain: v.EmitterChain, - EmitterAddress: v.EmitterAddress, - Payload: v.Payload, - ConsistencyLevel: v.ConsistencyLevel, - } + if p.state.signatures[hash].ourObservation != nil { + // We have made this observation on chain! // 2/3+ majority required for VAA to be valid - wait until we have quorum to submit VAA. quorum := CalculateQuorum(len(gs.Keys)) - p.logger.Info("aggregation state for VAA", + p.logger.Info("aggregation state for observation", zap.String("digest", hash), zap.Any("set", gs.KeysAsHexStrings()), zap.Uint32("index", gs.Index), @@ -218,32 +204,14 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs zap.Bool("quorum", len(sigs) >= quorum), ) - if len(sigs) >= quorum && !p.state.vaaSignatures[hash].submitted { - vaaBytes, err := signed.Marshal() - if err != nil { - panic(err) - } - - // Store signed VAA in database. - p.logger.Info("signed VAA with quorum", - zap.String("digest", hash), - zap.Any("vaa", signed), - zap.String("bytes", hex.EncodeToString(vaaBytes)), - zap.String("message_id", signed.MessageID())) - - if err := p.db.StoreSignedVAA(signed); err != nil { - p.logger.Error("failed to store signed VAA", zap.Error(err)) - } - - p.broadcastSignedVAA(signed) - p.attestationEvents.ReportVAAQuorum(signed) - p.state.vaaSignatures[hash].submitted = true + if len(sigs) >= quorum && !p.state.signatures[hash].submitted { + p.state.signatures[hash].ourObservation.HandleQuorum(sigs, hash, p) } else { p.logger.Info("quorum not met or already submitted, doing nothing", zap.String("digest", hash)) } } else { - p.logger.Info("we have not yet seen this VAA - temporarily storing signature", + p.logger.Info("we have not yet seen this observation - temporarily storing signature", zap.String("digest", hash), zap.Bools("aggregation", agg)) diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index 745e7108d..bb1698ad3 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -21,12 +21,26 @@ import ( ) type ( - // vaaState represents the local view of a given VAA - vaaState struct { + // Observation defines the interface for any events observed by the guardian. + Observation interface { + // GetEmitterChain returns the id of the chain where this event was observed. + GetEmitterChain() vaa.ChainID + // MessageID returns a human-readable emitter_chain/emitter_address/sequence tuple. + MessageID() string + // SigningMsg returns the hash of the signing body of the observation. This is used + // for signature generation and verification. + SigningMsg() ethcommon.Hash + // HandleQuorum finishes processing the observation once a quorum of signatures have + // been received for it. + HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) + } + + // state represents the local view of a given observation + state struct { // First time this digest was seen (possibly even before we observed it ourselves). firstObserved time.Time - // Copy of the VAA we constructed when we made our own observation. - ourVAA *vaa.VAA + // Copy of our observation. + ourObservation Observation // Map of signatures seen by guardian. During guardian set updates, this may contain signatures belonging // to either the old or new guardian set. signatures map[ethcommon.Address][]byte @@ -38,17 +52,17 @@ type ( source string // Number of times the cleanup service has attempted to retransmit this VAA. retryCount uint - // Copy of the bytes we submitted (ourVAA, but signed and serialized). Used for retransmissions. + // Copy of the bytes we submitted (ourObservation, but signed and serialized). Used for retransmissions. ourMsg []byte // Copy of the guardian set valid at observation/injection time. gs *common.GuardianSet } - vaaMap map[string]*vaaState + observationMap map[string]*state // aggregationState represents the node's aggregation of guardian signatures. aggregationState struct { - vaaSignatures vaaMap + signatures observationMap } ) @@ -137,7 +151,7 @@ func NewProcessor( notifier: notifier, logger: supervisor.Logger(ctx), - state: &aggregationState{vaaMap{}}, + state: &aggregationState{observationMap{}}, ourAddr: crypto.PubkeyToAddress(gk.PublicKey), } } diff --git a/node/pkg/processor/vaa.go b/node/pkg/processor/vaa.go new file mode 100644 index 000000000..285cd59dc --- /dev/null +++ b/node/pkg/processor/vaa.go @@ -0,0 +1,47 @@ +package processor + +import ( + "encoding/hex" + + "github.com/certusone/wormhole/node/pkg/vaa" + "go.uber.org/zap" +) + +type VAA struct { + vaa.VAA +} + +func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { + // Deep copy the observation and add signatures + signed := &vaa.VAA{ + Version: v.Version, + GuardianSetIndex: v.GuardianSetIndex, + Signatures: sigs, + Timestamp: v.Timestamp, + Nonce: v.Nonce, + Sequence: v.Sequence, + EmitterChain: v.EmitterChain, + EmitterAddress: v.EmitterAddress, + Payload: v.Payload, + ConsistencyLevel: v.ConsistencyLevel, + } + vaaBytes, err := signed.Marshal() + if err != nil { + panic(err) + } + + // Store signed VAA in database. + p.logger.Info("signed VAA with quorum", + zap.String("digest", hash), + zap.Any("vaa", signed), + zap.String("bytes", hex.EncodeToString(vaaBytes)), + zap.String("message_id", signed.MessageID())) + + if err := p.db.StoreSignedVAA(signed); err != nil { + p.logger.Error("failed to store signed VAA", zap.Error(err)) + } + + p.broadcastSignedVAA(signed) + p.attestationEvents.ReportVAAQuorum(signed) + p.state.signatures[hash].submitted = true +} diff --git a/node/pkg/vaa/structs.go b/node/pkg/vaa/structs.go index 8f8db458a..691481dce 100644 --- a/node/pkg/vaa/structs.go +++ b/node/pkg/vaa/structs.go @@ -500,6 +500,11 @@ func DecodeTransferPayloadHdr(payload []byte) (*TransferPayloadHdr, error) { return p, nil } +// GetEmitterChain implements the processor.Observation interface for *VAA. +func (v *VAA) GetEmitterChain() ChainID { + return v.EmitterChain +} + // MustWrite calls binary.Write and panics on errors func MustWrite(w io.Writer, order binary.ByteOrder, data interface{}) { if err := binary.Write(w, order, data); err != nil {