node: processor: Make observation state generic

We need to reuse almost all of the gossip infrastructure for accounting
transactions, with the only difference being that accounting will use a
`Transfer` message rather than a `VAA`.

Make the observation stored in the processor state generic so that it
can be either a VAA or a Transfer.  The rest of the code is shared.
This commit is contained in:
Chirantan Ekbote 2022-06-03 15:32:16 +09:00 committed by Chirantan Ekbote
parent e538c074d3
commit 7a1b1344a1
10 changed files with 174 additions and 123 deletions

View File

@ -47,12 +47,12 @@ func main() {
logger.Fatal("failed to initialize notifier", zap.Error(err))
}
if err := d.MissingSignaturesOnTransaction(v, 14, 13, true, []string{
if err := d.MissingSignaturesOnObservation(v, 14, 13, true, []string{
"Certus One", "Not Certus One"}); err != nil {
logger.Fatal("failed to send test message", zap.Error(err))
}
if err := d.MissingSignaturesOnTransaction(v, 14, 13, true, []string{
if err := d.MissingSignaturesOnObservation(v, 14, 13, true, []string{
"Certus One"}); err != nil {
logger.Fatal("failed to send test message", zap.Error(err))
}

View File

@ -2,6 +2,7 @@ package discord
import (
"bytes"
"encoding/hex"
"fmt"
"strings"
"sync"
@ -9,6 +10,7 @@ import (
"github.com/certusone/wormhole/node/pkg/vaa"
"github.com/diamondburned/arikawa/v3/api"
"github.com/diamondburned/arikawa/v3/discord"
"github.com/ethereum/go-ethereum/common"
"go.uber.org/zap"
)
@ -96,7 +98,15 @@ func (d *DiscordNotifier) LookupGroupID(groupName string) (string, error) {
return "", fmt.Errorf("failed to find group %s", groupName)
}
func (d *DiscordNotifier) MissingSignaturesOnTransaction(v *vaa.VAA, hasSigs, wantSigs int, quorum bool, missing []string) error {
// Observation defines the same interface as processor.Observation but redefined
// here to avoid circular dependencies.
type Observation interface {
GetEmitterChain() vaa.ChainID
MessageID() string
SigningMsg() common.Hash
}
func (d *DiscordNotifier) MissingSignaturesOnObservation(o Observation, hasSigs, wantSigs int, quorum bool, missing []string) error {
if len(missing) == 0 {
panic("no missing nodes specified")
}
@ -132,10 +142,10 @@ func (d *DiscordNotifier) MissingSignaturesOnTransaction(v *vaa.VAA, hasSigs, wa
discord.Embed{
Title: "Message with missing signatures",
Fields: []discord.EmbedField{
{Name: "Message ID", Value: wrapCode(v.MessageID()), Inline: true},
{Name: "Digest", Value: wrapCode(v.HexDigest()), Inline: true},
{Name: "Message ID", Value: wrapCode(o.MessageID()), Inline: true},
{Name: "Digest", Value: wrapCode(hex.EncodeToString(o.SigningMsg().Bytes())), Inline: true},
{Name: "Quorum", Value: quorumText, Inline: true},
{Name: "Source Chain", Value: strings.Title(v.EmitterChain.String()), Inline: false},
{Name: "Source Chain", Value: strings.Title(o.GetEmitterChain().String()), Inline: false},
{Name: "Missing Guardians", Value: missingText.String(), Inline: false},
},
},

View File

@ -23,15 +23,18 @@ var (
})
)
func (p *Processor) broadcastSignature(v *vaa.VAA, signature []byte, txhash []byte) {
digest := v.SigningMsg()
func (p *Processor) broadcastSignature(
o Observation,
signature []byte,
txhash []byte,
) {
digest := o.SigningMsg()
obsv := gossipv1.SignedObservation{
Addr: crypto.PubkeyToAddress(p.gk.PublicKey).Bytes(),
Hash: digest.Bytes(),
Signature: signature,
TxHash: txhash,
MessageId: v.MessageID(),
MessageId: o.MessageID(),
}
w := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedObservation{SignedObservation: &obsv}}
@ -46,18 +49,18 @@ func (p *Processor) broadcastSignature(v *vaa.VAA, signature []byte, txhash []by
// Store our VAA in case we're going to submit it to Solana
hash := hex.EncodeToString(digest.Bytes())
if p.state.vaaSignatures[hash] == nil {
p.state.vaaSignatures[hash] = &vaaState{
if p.state.signatures[hash] == nil {
p.state.signatures[hash] = &state{
firstObserved: time.Now(),
signatures: map[ethcommon.Address][]byte{},
source: "loopback",
}
}
p.state.vaaSignatures[hash].ourVAA = v
p.state.vaaSignatures[hash].ourMsg = msg
p.state.vaaSignatures[hash].source = v.EmitterChain.String()
p.state.vaaSignatures[hash].gs = p.gs // guaranteed to match ourVAA - there's no concurrent access to p.gs
p.state.signatures[hash].ourObservation = o
p.state.signatures[hash].ourMsg = msg
p.state.signatures[hash].source = o.GetEmitterChain().String()
p.state.signatures[hash].gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs
// Fast path for our own signature
go func() { p.obsvC <- &obsv }()

View File

@ -7,6 +7,7 @@ import (
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/notify/discord"
"github.com/certusone/wormhole/node/pkg/vaa"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -56,45 +57,46 @@ const (
settlementTime = time.Second * 30
)
// handleCleanup handles periodic retransmissions and cleanup of VAAs
// handleCleanup handles periodic retransmissions and cleanup of observations
func (p *Processor) handleCleanup(ctx context.Context) {
p.logger.Info("aggregation state summary", zap.Int("cached", len(p.state.vaaSignatures)))
aggregationStateEntries.Set(float64(len(p.state.vaaSignatures)))
p.logger.Info("aggregation state summary", zap.Int("cached", len(p.state.signatures)))
aggregationStateEntries.Set(float64(len(p.state.signatures)))
for hash, s := range p.state.vaaSignatures {
for hash, s := range p.state.signatures {
delta := time.Since(s.firstObserved)
switch {
case !s.submitted && s.ourVAA != nil && delta > settlementTime:
case !s.submitted && s.ourObservation != nil && delta > settlementTime:
// Expire pending VAAs post settlement time if we have a stored quorum VAA.
//
// This occurs when we observed a message after the cluster has already reached
// consensus on it, causing us to never achieve quorum.
if _, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(s.ourVAA)); err == nil {
// If we have a stored quorum VAA, we can safely expire the state.
//
// This is a rare case, and we can safely expire the state, since we
// have a quorum VAA.
p.logger.Info("Expiring late VAA", zap.String("digest", hash), zap.Duration("delta", delta))
aggregationStateLate.Inc()
delete(p.state.vaaSignatures, hash)
break
} else if err != db.ErrVAANotFound {
p.logger.Error("failed to look up VAA in database",
zap.String("digest", hash),
zap.Error(err),
)
if ourVaa, ok := s.ourObservation.(*VAA); ok {
if _, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(&ourVaa.VAA)); err == nil {
// If we have a stored quorum VAA, we can safely expire the state.
//
// This is a rare case, and we can safely expire the state, since we
// have a quorum VAA.
p.logger.Info("Expiring late VAA", zap.String("digest", hash), zap.Duration("delta", delta))
aggregationStateLate.Inc()
delete(p.state.signatures, hash)
break
} else if err != db.ErrVAANotFound {
p.logger.Error("failed to look up VAA in database",
zap.String("digest", hash),
zap.Error(err),
)
}
}
fallthrough
case !s.settled && delta > settlementTime:
// After 30 seconds, the VAA is considered settled - it's unlikely that more observations will
// After 30 seconds, the observation is considered settled - it's unlikely that more observations will
// arrive, barring special circumstances. This is a better time to count misses than submission,
// because we submit right when we quorum rather than waiting for all observations to arrive.
s.settled = true
// Use either the most recent (in case of a VAA we haven't seen) or stored gs, if available.
// Use either the most recent (in case of a observation we haven't seen) or stored gs, if available.
var gs *common.GuardianSet
if s.gs != nil {
gs = s.gs
@ -107,12 +109,12 @@ func (p *Processor) handleCleanup(ctx context.Context) {
quorum := hasSigs >= wantSigs
var chain vaa.ChainID
if s.ourVAA != nil {
chain = s.ourVAA.EmitterChain
if s.ourObservation != nil {
chain = s.ourObservation.GetEmitterChain()
// If a notifier is configured, send a notification for any missing signatures.
//
// Only send a notification if we have a VAA. Otherwise, bogus observations
// Only send a notification if we have a observation. Otherwise, bogus observations
// could cause invalid alerts.
if p.notifier != nil && hasSigs < len(gs.Keys) {
p.logger.Info("sending miss notification", zap.String("digest", hash))
@ -134,16 +136,16 @@ func (p *Processor) handleCleanup(ctx context.Context) {
// Send notification for individual message when quorum has failed or
// more than one node is missing.
if !quorum || len(missing) > 1 {
go func(v *vaa.VAA, hasSigs, wantSigs int, quorum bool, missing []string) {
if err := p.notifier.MissingSignaturesOnTransaction(v, hasSigs, wantSigs, quorum, missing); err != nil {
go func(o discord.Observation, hasSigs, wantSigs int, quorum bool, missing []string) {
if err := p.notifier.MissingSignaturesOnObservation(o, hasSigs, wantSigs, quorum, missing); err != nil {
p.logger.Error("failed to send notification", zap.Error(err))
}
}(s.ourVAA, hasSigs, wantSigs, quorum, missing)
}(s.ourObservation, hasSigs, wantSigs, quorum, missing)
}
}
}
p.logger.Info("VAA considered settled",
p.logger.Info("observation considered settled",
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Int("have_sigs", hasSigs),
@ -160,26 +162,26 @@ func (p *Processor) handleCleanup(ctx context.Context) {
}
}
case s.submitted && delta.Hours() >= 1:
// We could delete submitted VAAs right away, but then we'd lose context about additional (late)
// We could delete submitted observations right away, but then we'd lose context about additional (late)
// observation that come in. Therefore, keep it for a reasonable amount of time.
// If a very late observation arrives after cleanup, a nil aggregation state will be created
// and then expired after a while (as noted in observation.go, this can be abused by a byzantine guardian).
p.logger.Info("expiring submitted VAA", zap.String("digest", hash), zap.Duration("delta", delta))
delete(p.state.vaaSignatures, hash)
p.logger.Info("expiring submitted observation", zap.String("digest", hash), zap.Duration("delta", delta))
delete(p.state.signatures, hash)
aggregationStateExpiration.Inc()
case !s.submitted && ((s.ourMsg != nil && s.retryCount >= 14400 /* 120 hours */) || (s.ourMsg == nil && s.retryCount >= 10 /* 5 minutes */)):
// Clearly, this horse is dead and continued beatings won't bring it closer to quorum.
p.logger.Info("expiring unsubmitted VAA after exhausting retries", zap.String("digest", hash), zap.Duration("delta", delta))
delete(p.state.vaaSignatures, hash)
p.logger.Info("expiring unsubmitted observation after exhausting retries", zap.String("digest", hash), zap.Duration("delta", delta))
delete(p.state.signatures, hash)
aggregationStateTimeout.Inc()
case !s.submitted && delta.Minutes() >= 5:
// Poor VAA has been unsubmitted for five minutes - clearly, something went wrong.
// Poor observation has been unsubmitted for five minutes - clearly, something went wrong.
// If we have previously submitted an observation, we can make another attempt to get it over
// the finish line by rebroadcasting our sig. If we do not have a VAA, it means we either never observed it,
// the finish line by rebroadcasting our sig. If we do not have a observation, it means we either never observed it,
// or it got revived by a malfunctioning guardian node, in which case, we can't do anything
// about it and just delete it to keep our state nice and lean.
if s.ourMsg != nil {
p.logger.Info("resubmitting VAA observation",
p.logger.Info("resubmitting observation",
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Uint("retry", s.retryCount))
@ -193,14 +195,14 @@ func (p *Processor) handleCleanup(ctx context.Context) {
hasSigs := len(s.signatures)
wantSigs := CalculateQuorum(len(p.gs.Keys))
p.logger.Info("expiring unsubmitted nil VAA",
p.logger.Info("expiring unsubmitted nil observation",
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Int("have_sigs", hasSigs),
zap.Int("required_sigs", wantSigs),
zap.Bool("quorum", hasSigs >= wantSigs),
)
delete(p.state.vaaSignatures, hash)
delete(p.state.signatures, hash)
aggregationStateUnobserved.Inc()
}
}

View File

@ -42,5 +42,5 @@ func (p *Processor) handleInjection(ctx context.Context, v *vaa.VAA) {
zap.String("signature", hex.EncodeToString(s)))
vaaInjectionsTotal.Inc()
p.broadcastSignature(v, s, nil)
p.broadcastSignature(&VAA{VAA: *v}, s, nil)
}

View File

@ -67,17 +67,19 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat
// All nodes will create the exact same VAA and sign its digest.
// Consensus is established on this digest.
v := &vaa.VAA{
Version: vaa.SupportedVAAVersion,
GuardianSetIndex: p.gs.Index,
Signatures: nil,
Timestamp: k.Timestamp,
Nonce: k.Nonce,
EmitterChain: k.EmitterChain,
EmitterAddress: k.EmitterAddress,
Payload: k.Payload,
Sequence: k.Sequence,
ConsistencyLevel: k.ConsistencyLevel,
v := &VAA{
VAA: vaa.VAA{
Version: vaa.SupportedVAAVersion,
GuardianSetIndex: p.gs.Index,
Signatures: nil,
Timestamp: k.Timestamp,
Nonce: k.Nonce,
EmitterChain: k.EmitterChain,
EmitterAddress: k.EmitterAddress,
Payload: k.Payload,
Sequence: k.Sequence,
ConsistencyLevel: k.ConsistencyLevel,
},
}
// A governance message should never be emitted on-chain
@ -98,7 +100,7 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat
//
// Exception: if an observation is made within the settlement time (30s), we'll
// process it so other nodes won't consider it a miss.
if vb, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(v)); err == nil {
if vb, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(&v.VAA)); err == nil {
// unmarshal vaa
var existing *vaa.VAA
if existing, err = vaa.Unmarshal(vb); err != nil {
@ -156,7 +158,7 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat
messagesSignedTotal.With(prometheus.Labels{
"emitter_chain": k.EmitterChain.String()}).Add(1)
p.attestationEvents.ReportMessagePublication(&reporter.MessagePublication{VAA: *v, InitiatingTxID: k.TxHash})
p.attestationEvents.ReportMessagePublication(&reporter.MessagePublication{VAA: v.VAA, InitiatingTxID: k.TxHash})
p.broadcastSignature(v, s, k.TxHash.Bytes())
}

View File

@ -93,7 +93,7 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
// Determine which guardian set to use. The following cases are possible:
//
// - We have already seen the message and generated ourVAA. In this case, use the guardian set valid at the time,
// - We have already seen the message and generated ourObservation. In this case, use the guardian set valid at the time,
// even if the guardian set was updated. Old guardian sets remain valid for longer than aggregation state,
// and the guardians in the old set stay online and observe and sign messages for the transition period.
//
@ -108,8 +108,8 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
// During an update, vaaState.signatures can contain signatures from *both* guardian sets.
//
var gs *node_common.GuardianSet
if p.state.vaaSignatures[hash] != nil && p.state.vaaSignatures[hash].gs != nil {
gs = p.state.vaaSignatures[hash].gs
if p.state.signatures[hash] != nil && p.state.signatures[hash].gs != nil {
gs = p.state.signatures[hash].gs
} else {
gs = p.gs
}
@ -147,7 +147,7 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
observationsReceivedByGuardianAddressTotal.WithLabelValues(their_addr.Hex()).Inc()
// []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging.
if p.state.vaaSignatures[hash] == nil {
if p.state.signatures[hash] == nil {
// We haven't yet seen this event ourselves, and therefore do not know what the VAA looks like.
// However, we have established that a valid guardian has signed it, and therefore we can
// already start aggregating signatures for it.
@ -158,20 +158,20 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
observationsUnknownTotal.Inc()
p.state.vaaSignatures[hash] = &vaaState{
p.state.signatures[hash] = &state{
firstObserved: time.Now(),
signatures: map[common.Address][]byte{},
source: "unknown",
}
}
p.state.vaaSignatures[hash].signatures[their_addr] = m.Signature
p.state.signatures[hash].signatures[their_addr] = m.Signature
// Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA.
agg := make([]bool, len(gs.Keys))
var sigs []*vaa.Signature
for i, a := range gs.Keys {
s, ok := p.state.vaaSignatures[hash].signatures[a]
s, ok := p.state.signatures[hash].signatures[a]
if ok {
var bs [65]byte
@ -188,27 +188,13 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
agg[i] = ok
}
if p.state.vaaSignatures[hash].ourVAA != nil {
// We have seen it on chain!
// Deep copy the VAA and add signatures
v := p.state.vaaSignatures[hash].ourVAA
signed := &vaa.VAA{
Version: v.Version,
GuardianSetIndex: v.GuardianSetIndex,
Signatures: sigs,
Timestamp: v.Timestamp,
Nonce: v.Nonce,
Sequence: v.Sequence,
EmitterChain: v.EmitterChain,
EmitterAddress: v.EmitterAddress,
Payload: v.Payload,
ConsistencyLevel: v.ConsistencyLevel,
}
if p.state.signatures[hash].ourObservation != nil {
// We have made this observation on chain!
// 2/3+ majority required for VAA to be valid - wait until we have quorum to submit VAA.
quorum := CalculateQuorum(len(gs.Keys))
p.logger.Info("aggregation state for VAA",
p.logger.Info("aggregation state for observation",
zap.String("digest", hash),
zap.Any("set", gs.KeysAsHexStrings()),
zap.Uint32("index", gs.Index),
@ -218,32 +204,14 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
zap.Bool("quorum", len(sigs) >= quorum),
)
if len(sigs) >= quorum && !p.state.vaaSignatures[hash].submitted {
vaaBytes, err := signed.Marshal()
if err != nil {
panic(err)
}
// Store signed VAA in database.
p.logger.Info("signed VAA with quorum",
zap.String("digest", hash),
zap.Any("vaa", signed),
zap.String("bytes", hex.EncodeToString(vaaBytes)),
zap.String("message_id", signed.MessageID()))
if err := p.db.StoreSignedVAA(signed); err != nil {
p.logger.Error("failed to store signed VAA", zap.Error(err))
}
p.broadcastSignedVAA(signed)
p.attestationEvents.ReportVAAQuorum(signed)
p.state.vaaSignatures[hash].submitted = true
if len(sigs) >= quorum && !p.state.signatures[hash].submitted {
p.state.signatures[hash].ourObservation.HandleQuorum(sigs, hash, p)
} else {
p.logger.Info("quorum not met or already submitted, doing nothing",
zap.String("digest", hash))
}
} else {
p.logger.Info("we have not yet seen this VAA - temporarily storing signature",
p.logger.Info("we have not yet seen this observation - temporarily storing signature",
zap.String("digest", hash),
zap.Bools("aggregation", agg))

View File

@ -21,12 +21,26 @@ import (
)
type (
// vaaState represents the local view of a given VAA
vaaState struct {
// Observation defines the interface for any events observed by the guardian.
Observation interface {
// GetEmitterChain returns the id of the chain where this event was observed.
GetEmitterChain() vaa.ChainID
// MessageID returns a human-readable emitter_chain/emitter_address/sequence tuple.
MessageID() string
// SigningMsg returns the hash of the signing body of the observation. This is used
// for signature generation and verification.
SigningMsg() ethcommon.Hash
// HandleQuorum finishes processing the observation once a quorum of signatures have
// been received for it.
HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor)
}
// state represents the local view of a given observation
state struct {
// First time this digest was seen (possibly even before we observed it ourselves).
firstObserved time.Time
// Copy of the VAA we constructed when we made our own observation.
ourVAA *vaa.VAA
// Copy of our observation.
ourObservation Observation
// Map of signatures seen by guardian. During guardian set updates, this may contain signatures belonging
// to either the old or new guardian set.
signatures map[ethcommon.Address][]byte
@ -38,17 +52,17 @@ type (
source string
// Number of times the cleanup service has attempted to retransmit this VAA.
retryCount uint
// Copy of the bytes we submitted (ourVAA, but signed and serialized). Used for retransmissions.
// Copy of the bytes we submitted (ourObservation, but signed and serialized). Used for retransmissions.
ourMsg []byte
// Copy of the guardian set valid at observation/injection time.
gs *common.GuardianSet
}
vaaMap map[string]*vaaState
observationMap map[string]*state
// aggregationState represents the node's aggregation of guardian signatures.
aggregationState struct {
vaaSignatures vaaMap
signatures observationMap
}
)
@ -137,7 +151,7 @@ func NewProcessor(
notifier: notifier,
logger: supervisor.Logger(ctx),
state: &aggregationState{vaaMap{}},
state: &aggregationState{observationMap{}},
ourAddr: crypto.PubkeyToAddress(gk.PublicKey),
}
}

47
node/pkg/processor/vaa.go Normal file
View File

@ -0,0 +1,47 @@
package processor
import (
"encoding/hex"
"github.com/certusone/wormhole/node/pkg/vaa"
"go.uber.org/zap"
)
type VAA struct {
vaa.VAA
}
func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
// Deep copy the observation and add signatures
signed := &vaa.VAA{
Version: v.Version,
GuardianSetIndex: v.GuardianSetIndex,
Signatures: sigs,
Timestamp: v.Timestamp,
Nonce: v.Nonce,
Sequence: v.Sequence,
EmitterChain: v.EmitterChain,
EmitterAddress: v.EmitterAddress,
Payload: v.Payload,
ConsistencyLevel: v.ConsistencyLevel,
}
vaaBytes, err := signed.Marshal()
if err != nil {
panic(err)
}
// Store signed VAA in database.
p.logger.Info("signed VAA with quorum",
zap.String("digest", hash),
zap.Any("vaa", signed),
zap.String("bytes", hex.EncodeToString(vaaBytes)),
zap.String("message_id", signed.MessageID()))
if err := p.db.StoreSignedVAA(signed); err != nil {
p.logger.Error("failed to store signed VAA", zap.Error(err))
}
p.broadcastSignedVAA(signed)
p.attestationEvents.ReportVAAQuorum(signed)
p.state.signatures[hash].submitted = true
}

View File

@ -500,6 +500,11 @@ func DecodeTransferPayloadHdr(payload []byte) (*TransferPayloadHdr, error) {
return p, nil
}
// GetEmitterChain implements the processor.Observation interface for *VAA.
func (v *VAA) GetEmitterChain() ChainID {
return v.EmitterChain
}
// MustWrite calls binary.Write and panics on errors
func MustWrite(w io.Writer, order binary.ByteOrder, data interface{}) {
if err := binary.Write(w, order, data); err != nil {