diff --git a/node/pkg/common/chainlock.go b/node/pkg/common/chainlock.go index e42cee699..34f451355 100644 --- a/node/pkg/common/chainlock.go +++ b/node/pkg/common/chainlock.go @@ -21,6 +21,10 @@ type MessagePublication struct { EmitterChain vaa.ChainID EmitterAddress vaa.Address Payload []byte + + // Unreliable indicates if this message can be reobserved. If a message is considered unreliable it cannot be + // reobserved. + Unreliable bool } func (msg *MessagePublication) MessageID() []byte { diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index a8accdab8..e406f6969 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -178,12 +178,19 @@ func (p *Processor) handleCleanup(ctx context.Context) { aggregationStateTimeout.Inc() case !s.submitted && delta.Minutes() >= 5 && time.Since(s.lastRetry) >= retryTime: // Poor observation has been unsubmitted for five minutes - clearly, something went wrong. - // If we have previously submitted an observation, we can make another attempt to get it over - // the finish line by sending a re-observation request to the network and rebroadcasting our + // If we have previously submitted an observation, and it was reliable, we can make another attempt to get + // it over the finish line by sending a re-observation request to the network and rebroadcasting our // sig. If we do not have an observation, it means we either never observed it, or it got // revived by a malfunctioning guardian node, in which case, we can't do anything about it // and just delete it to keep our state nice and lean. if s.ourMsg != nil { + // Unreliable observations cannot be resubmitted and can be considered failed after 5 minutes + if !s.ourObservation.IsReliable() { + p.logger.Info("expiring unsubmitted unreliable observation", zap.String("digest", hash), zap.Duration("delta", delta)) + delete(p.state.signatures, hash) + aggregationStateTimeout.Inc() + break + } p.logger.Info("resubmitting observation", zap.String("digest", hash), zap.Duration("delta", delta), diff --git a/node/pkg/processor/message.go b/node/pkg/processor/message.go index 940d82ce5..681d19347 100644 --- a/node/pkg/processor/message.go +++ b/node/pkg/processor/message.go @@ -80,6 +80,7 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat Sequence: k.Sequence, ConsistencyLevel: k.ConsistencyLevel, }, + Unreliable: k.Unreliable, } // A governance message should never be emitted on-chain diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index 900d7bd7f..a558405e3 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -31,6 +31,8 @@ type ( // SigningMsg returns the hash of the signing body of the observation. This is used // for signature generation and verification. SigningMsg() ethcommon.Hash + // IsReliable returns whether this message is considered reliable meaning it can be reobserved. + IsReliable() bool // HandleQuorum finishes processing the observation once a quorum of signatures have // been received for it. HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) diff --git a/node/pkg/processor/vaa.go b/node/pkg/processor/vaa.go index 731ef4574..a378a576b 100644 --- a/node/pkg/processor/vaa.go +++ b/node/pkg/processor/vaa.go @@ -9,6 +9,7 @@ import ( type VAA struct { vaa.VAA + Unreliable bool } func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { @@ -45,3 +46,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { p.attestationEvents.ReportVAAQuorum(signed) p.state.signatures[hash].submitted = true } + +func (v *VAA) IsReliable() bool { + return !v.Unreliable +} diff --git a/node/pkg/solana/client.go b/node/pkg/solana/client.go index bd2117a55..f30321129 100644 --- a/node/pkg/solana/client.go +++ b/node/pkg/solana/client.go @@ -96,6 +96,8 @@ const ( postMessageInstructionNumAccounts = 9 postMessageInstructionID = 0x01 postMessageUnreliableInstructionID = 0x08 + accountPrefixReliable = "msg" + accountPrefixUnreliable = "msu" ) // PostMessageData represents the user-supplied, untrusted instruction data @@ -499,7 +501,7 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Log } data := info.Value.Data.GetBinary() - if string(data[:3]) != "msg" && string(data[:3]) != "msu" { + if string(data[:3]) != accountPrefixReliable && string(data[:3]) != accountPrefixUnreliable { p2p.DefaultRegistry.AddErrorCount(s.chainID, 1) solanaConnectionErrors.WithLabelValues(s.networkName, string(s.commitment), "bad_account_data").Inc() logger.Error("account is not a message account", @@ -534,6 +536,16 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a var txHash eth_common.Hash copy(txHash[:], acc[:]) + var reliable bool + switch string(data[:3]) { + case accountPrefixReliable: + reliable = true + case accountPrefixUnreliable: + reliable = false + default: + panic("invalid prefix") + } + observation := &common.MessagePublication{ TxHash: txHash, Timestamp: time.Unix(int64(proposal.SubmissionTime), 0), @@ -543,6 +555,7 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a EmitterAddress: proposal.EmitterAddress, Payload: proposal.Payload, ConsistencyLevel: proposal.ConsistencyLevel, + Unreliable: !reliable, } solanaMessagesConfirmed.WithLabelValues(s.networkName).Inc()