node: prevent reobservation of unreliable messages (#1627)
This commit is contained in:
parent
1febea03b5
commit
5993a231fa
|
@ -21,6 +21,10 @@ type MessagePublication struct {
|
||||||
EmitterChain vaa.ChainID
|
EmitterChain vaa.ChainID
|
||||||
EmitterAddress vaa.Address
|
EmitterAddress vaa.Address
|
||||||
Payload []byte
|
Payload []byte
|
||||||
|
|
||||||
|
// Unreliable indicates if this message can be reobserved. If a message is considered unreliable it cannot be
|
||||||
|
// reobserved.
|
||||||
|
Unreliable bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (msg *MessagePublication) MessageID() []byte {
|
func (msg *MessagePublication) MessageID() []byte {
|
||||||
|
|
|
@ -178,12 +178,19 @@ func (p *Processor) handleCleanup(ctx context.Context) {
|
||||||
aggregationStateTimeout.Inc()
|
aggregationStateTimeout.Inc()
|
||||||
case !s.submitted && delta.Minutes() >= 5 && time.Since(s.lastRetry) >= retryTime:
|
case !s.submitted && delta.Minutes() >= 5 && time.Since(s.lastRetry) >= retryTime:
|
||||||
// Poor observation has been unsubmitted for five minutes - clearly, something went wrong.
|
// Poor observation has been unsubmitted for five minutes - clearly, something went wrong.
|
||||||
// If we have previously submitted an observation, we can make another attempt to get it over
|
// If we have previously submitted an observation, and it was reliable, we can make another attempt to get
|
||||||
// the finish line by sending a re-observation request to the network and rebroadcasting our
|
// it over the finish line by sending a re-observation request to the network and rebroadcasting our
|
||||||
// sig. If we do not have an observation, it means we either never observed it, or it got
|
// sig. If we do not have an observation, it means we either never observed it, or it got
|
||||||
// revived by a malfunctioning guardian node, in which case, we can't do anything about it
|
// revived by a malfunctioning guardian node, in which case, we can't do anything about it
|
||||||
// and just delete it to keep our state nice and lean.
|
// and just delete it to keep our state nice and lean.
|
||||||
if s.ourMsg != nil {
|
if s.ourMsg != nil {
|
||||||
|
// Unreliable observations cannot be resubmitted and can be considered failed after 5 minutes
|
||||||
|
if !s.ourObservation.IsReliable() {
|
||||||
|
p.logger.Info("expiring unsubmitted unreliable observation", zap.String("digest", hash), zap.Duration("delta", delta))
|
||||||
|
delete(p.state.signatures, hash)
|
||||||
|
aggregationStateTimeout.Inc()
|
||||||
|
break
|
||||||
|
}
|
||||||
p.logger.Info("resubmitting observation",
|
p.logger.Info("resubmitting observation",
|
||||||
zap.String("digest", hash),
|
zap.String("digest", hash),
|
||||||
zap.Duration("delta", delta),
|
zap.Duration("delta", delta),
|
||||||
|
|
|
@ -80,6 +80,7 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat
|
||||||
Sequence: k.Sequence,
|
Sequence: k.Sequence,
|
||||||
ConsistencyLevel: k.ConsistencyLevel,
|
ConsistencyLevel: k.ConsistencyLevel,
|
||||||
},
|
},
|
||||||
|
Unreliable: k.Unreliable,
|
||||||
}
|
}
|
||||||
|
|
||||||
// A governance message should never be emitted on-chain
|
// A governance message should never be emitted on-chain
|
||||||
|
|
|
@ -31,6 +31,8 @@ type (
|
||||||
// SigningMsg returns the hash of the signing body of the observation. This is used
|
// SigningMsg returns the hash of the signing body of the observation. This is used
|
||||||
// for signature generation and verification.
|
// for signature generation and verification.
|
||||||
SigningMsg() ethcommon.Hash
|
SigningMsg() ethcommon.Hash
|
||||||
|
// IsReliable returns whether this message is considered reliable meaning it can be reobserved.
|
||||||
|
IsReliable() bool
|
||||||
// HandleQuorum finishes processing the observation once a quorum of signatures have
|
// HandleQuorum finishes processing the observation once a quorum of signatures have
|
||||||
// been received for it.
|
// been received for it.
|
||||||
HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor)
|
HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor)
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
type VAA struct {
|
type VAA struct {
|
||||||
vaa.VAA
|
vaa.VAA
|
||||||
|
Unreliable bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
|
func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
|
||||||
|
@ -45,3 +46,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
|
||||||
p.attestationEvents.ReportVAAQuorum(signed)
|
p.attestationEvents.ReportVAAQuorum(signed)
|
||||||
p.state.signatures[hash].submitted = true
|
p.state.signatures[hash].submitted = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (v *VAA) IsReliable() bool {
|
||||||
|
return !v.Unreliable
|
||||||
|
}
|
||||||
|
|
|
@ -96,6 +96,8 @@ const (
|
||||||
postMessageInstructionNumAccounts = 9
|
postMessageInstructionNumAccounts = 9
|
||||||
postMessageInstructionID = 0x01
|
postMessageInstructionID = 0x01
|
||||||
postMessageUnreliableInstructionID = 0x08
|
postMessageUnreliableInstructionID = 0x08
|
||||||
|
accountPrefixReliable = "msg"
|
||||||
|
accountPrefixUnreliable = "msu"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PostMessageData represents the user-supplied, untrusted instruction data
|
// PostMessageData represents the user-supplied, untrusted instruction data
|
||||||
|
@ -499,7 +501,7 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Log
|
||||||
}
|
}
|
||||||
|
|
||||||
data := info.Value.Data.GetBinary()
|
data := info.Value.Data.GetBinary()
|
||||||
if string(data[:3]) != "msg" && string(data[:3]) != "msu" {
|
if string(data[:3]) != accountPrefixReliable && string(data[:3]) != accountPrefixUnreliable {
|
||||||
p2p.DefaultRegistry.AddErrorCount(s.chainID, 1)
|
p2p.DefaultRegistry.AddErrorCount(s.chainID, 1)
|
||||||
solanaConnectionErrors.WithLabelValues(s.networkName, string(s.commitment), "bad_account_data").Inc()
|
solanaConnectionErrors.WithLabelValues(s.networkName, string(s.commitment), "bad_account_data").Inc()
|
||||||
logger.Error("account is not a message account",
|
logger.Error("account is not a message account",
|
||||||
|
@ -534,6 +536,16 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a
|
||||||
var txHash eth_common.Hash
|
var txHash eth_common.Hash
|
||||||
copy(txHash[:], acc[:])
|
copy(txHash[:], acc[:])
|
||||||
|
|
||||||
|
var reliable bool
|
||||||
|
switch string(data[:3]) {
|
||||||
|
case accountPrefixReliable:
|
||||||
|
reliable = true
|
||||||
|
case accountPrefixUnreliable:
|
||||||
|
reliable = false
|
||||||
|
default:
|
||||||
|
panic("invalid prefix")
|
||||||
|
}
|
||||||
|
|
||||||
observation := &common.MessagePublication{
|
observation := &common.MessagePublication{
|
||||||
TxHash: txHash,
|
TxHash: txHash,
|
||||||
Timestamp: time.Unix(int64(proposal.SubmissionTime), 0),
|
Timestamp: time.Unix(int64(proposal.SubmissionTime), 0),
|
||||||
|
@ -543,6 +555,7 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a
|
||||||
EmitterAddress: proposal.EmitterAddress,
|
EmitterAddress: proposal.EmitterAddress,
|
||||||
Payload: proposal.Payload,
|
Payload: proposal.Payload,
|
||||||
ConsistencyLevel: proposal.ConsistencyLevel,
|
ConsistencyLevel: proposal.ConsistencyLevel,
|
||||||
|
Unreliable: !reliable,
|
||||||
}
|
}
|
||||||
|
|
||||||
solanaMessagesConfirmed.WithLabelValues(s.networkName).Inc()
|
solanaMessagesConfirmed.WithLabelValues(s.networkName).Inc()
|
||||||
|
|
Loading…
Reference in New Issue