From 2b582b10b8b45175c2aca57b3854643c6620c805 Mon Sep 17 00:00:00 2001 From: bruce-riley <96066700+bruce-riley@users.noreply.github.com> Date: Mon, 26 Sep 2022 11:24:30 -0500 Subject: [PATCH] node: governor publish gossip (#1538) * Governor publish gossip Change-Id: I2b8b1ea84a0c411101a7027acd3a27a6d6464d59 * Update the config publish time Change-Id: Ic6abf84befb1c20756da2ff66b15a8325dc46067 * Not setting value on enqueued VAAs correctly Change-Id: I9fd3a5d8fc574f8382125445fa688efdae45b88c * Publish at most 20 VAAs, not 20 per chain Change-Id: Ic9dff99c59ee89d57fd79158844a1fe1a0003112 * Switch to using signed messages Change-Id: I66cddc7477cd477aa77bdadfc346b588f2ae645b * Publish status only once per minute Change-Id: I972fb0cf868e89c6f74ae4441471a55df389f4dd * Minor comment change Change-Id: I0d3e443cbec7edd282f89c1a5cce5d5ec8776d55 --- node/pkg/governor/governor.go | 24 ++-- node/pkg/governor/governor_monitoring.go | 168 ++++++++++++++++++++++- node/pkg/p2p/p2p.go | 6 +- proto/gossip/v1/gossip.proto | 72 ++++++++++ 4 files changed, 258 insertions(+), 12 deletions(-) diff --git a/node/pkg/governor/governor.go b/node/pkg/governor/governor.go index 10b67a1dd..5f9e26b60 100644 --- a/node/pkg/governor/governor.go +++ b/node/pkg/governor/governor.go @@ -112,16 +112,20 @@ func (ce *chainEntry) isBigTransfer(value uint64) bool { } type ChainGovernor struct { - db db.GovernorDB - logger *zap.Logger - mutex sync.Mutex - tokens map[tokenKey]*tokenEntry - tokensByCoinGeckoId map[string][]*tokenEntry - chains map[vaa.ChainID]*chainEntry - msgsToPublish []*common.MessagePublication - dayLengthInMinutes int - coinGeckoQuery string - env int + db db.GovernorDB + logger *zap.Logger + mutex sync.Mutex + tokens map[tokenKey]*tokenEntry + tokensByCoinGeckoId map[string][]*tokenEntry + chains map[vaa.ChainID]*chainEntry + msgsToPublish []*common.MessagePublication + dayLengthInMinutes int + coinGeckoQuery string + env int + nextStatusPublishTime time.Time + nextConfigPublishTime time.Time + statusPublishCounter int64 + configPublishCounter int64 } func NewChainGovernor( diff --git a/node/pkg/governor/governor_monitoring.go b/node/pkg/governor/governor_monitoring.go index fd140909b..00bfeb692 100644 --- a/node/pkg/governor/governor_monitoring.go +++ b/node/pkg/governor/governor_monitoring.go @@ -60,9 +60,22 @@ // - This is a single metric that indicates the total number of enqueued VAAs across all chains. This provides a quick check if // anything is currently being limited. +// The chain governor also publishes the following messages to the gossip network +// +// SignedChainGovernorConfig +// - Published once every five minutes. +// - Contains a list of configured chains, along with the daily limit, big transaction size and current price. +// +// - SignedChainGovernorStatus +// - Published once a minute. +// - Contains a list of configured chains along with their remaining available notional value, the number of enqueued VAAs +// and information on zero or more enqueued VAAs. +// - Only the first 20 enqueued VAAs are include, to constrain the message size. + package governor import ( + "crypto/ecdsa" "fmt" "sort" "time" @@ -73,8 +86,13 @@ import ( "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" + ethCommon "github.com/ethereum/go-ethereum/common" + ethCrypto "github.com/ethereum/go-ethereum/crypto" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + + "google.golang.org/protobuf/proto" ) // Admin command to display status to the log. @@ -378,7 +396,7 @@ var ( }) ) -func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat) { +func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat, sendC chan []byte, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) { gov.mutex.Lock() defer gov.mutex.Unlock() @@ -431,4 +449,152 @@ func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat) { } metricTotalEnqueuedVAAs.Set(float64(totalPending)) + + if startTime.After(gov.nextConfigPublishTime) { + gov.publishConfig(hb, sendC, gk, ourAddr) + gov.nextConfigPublishTime = startTime.Add(time.Minute * time.Duration(5)) + } + + if startTime.After(gov.nextStatusPublishTime) { + gov.publishStatus(hb, sendC, startTime, gk, ourAddr) + gov.nextStatusPublishTime = startTime.Add(time.Minute) + } +} + +var governorMessagePrefix = []byte("governor|") + +func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan []byte, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) { + chains := make([]*gossipv1.ChainGovernorConfig_Chain, 0) + for _, ce := range gov.chains { + chains = append(chains, &gossipv1.ChainGovernorConfig_Chain{ + ChainId: uint32(ce.emitterChainId), + NotionalLimit: ce.dailyLimit, + BigTransactionSize: ce.bigTransactionSize, + }) + } + + tokens := make([]*gossipv1.ChainGovernorConfig_Token, 0) + for tk, te := range gov.tokens { + price, _ := te.price.Float32() + tokens = append(tokens, &gossipv1.ChainGovernorConfig_Token{ + OriginChainId: uint32(tk.chain), + OriginAddress: "0x" + tk.addr.String(), + Price: price, + }) + } + + gov.configPublishCounter += 1 + payload := &gossipv1.ChainGovernorConfig{ + NodeName: hb.NodeName, + Counter: gov.configPublishCounter, + Timestamp: hb.Timestamp, + Chains: chains, + Tokens: tokens, + } + + b, err := proto.Marshal(payload) + if err != nil { + gov.logger.Error("cgov: failed to marshal config message", zap.Error(err)) + return + } + + digest := ethCrypto.Keccak256Hash(append(governorMessagePrefix, b...)) + + sig, err := ethCrypto.Sign(digest.Bytes(), gk) + if err != nil { + panic(err) + } + + msg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedChainGovernorConfig{ + SignedChainGovernorConfig: &gossipv1.SignedChainGovernorConfig{ + Config: b, + Signature: sig, + GuardianAddr: ourAddr.Bytes(), + }}} + + b, err = proto.Marshal(&msg) + if err != nil { + panic(err) + } + + sendC <- b +} + +func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan []byte, startTime time.Time, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) { + chains := make([]*gossipv1.ChainGovernorStatus_Chain, 0) + numEnqueued := 0 + for _, ce := range gov.chains { + value := sumValue(ce.transfers, startTime) + if value >= ce.dailyLimit { + value = 0 + } else { + value = ce.dailyLimit - value + } + + enqueuedVaas := make([]*gossipv1.ChainGovernorStatus_EnqueuedVAA, 0) + for _, pe := range ce.pending { + value, err := computeValue(pe.amount, pe.token) + if err != nil { + gov.logger.Error("cgov: failed to compute value of pending transfer", zap.String("msgID", pe.dbData.Msg.MessageIDString()), zap.Error(err)) + value = 0 + } + + if numEnqueued < 20 { + numEnqueued = numEnqueued + 1 + enqueuedVaas = append(enqueuedVaas, &gossipv1.ChainGovernorStatus_EnqueuedVAA{ + Sequence: pe.dbData.Msg.Sequence, + ReleaseTime: uint32(pe.dbData.ReleaseTime.Unix()), + NotionalValue: value, + TxHash: pe.dbData.Msg.TxHash.String(), + }) + } + } + + emitter := gossipv1.ChainGovernorStatus_Emitter{ + EmitterAddress: "0x" + ce.emitterAddr.String(), + TotalEnqueuedVaas: uint64(len(ce.pending)), + EnqueuedVaas: enqueuedVaas, + } + + chains = append(chains, &gossipv1.ChainGovernorStatus_Chain{ + ChainId: uint32(ce.emitterChainId), + RemainingAvailableNotional: value, + Emitters: []*gossipv1.ChainGovernorStatus_Emitter{&emitter}, + }) + } + + gov.statusPublishCounter += 1 + payload := &gossipv1.ChainGovernorStatus{ + NodeName: hb.NodeName, + Counter: gov.statusPublishCounter, + Timestamp: hb.Timestamp, + Chains: chains, + } + + b, err := proto.Marshal(payload) + if err != nil { + gov.logger.Error("cgov: failed to marshal status message", zap.Error(err)) + return + } + + digest := ethCrypto.Keccak256Hash(append(governorMessagePrefix, b...)) + + sig, err := ethCrypto.Sign(digest.Bytes(), gk) + if err != nil { + panic(err) + } + + msg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedChainGovernorStatus{ + SignedChainGovernorStatus: &gossipv1.SignedChainGovernorStatus{ + Status: b, + Signature: sig, + GuardianAddr: ourAddr.Bytes(), + }}} + + b, err = proto.Marshal(&msg) + if err != nil { + panic(err) + } + + sendC <- b } diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index dd3f4ef05..277071929 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -229,7 +229,7 @@ func Run(obsvC chan *gossipv1.SignedObservation, obsvReqC chan *gossipv1.Observa collectNodeMetrics(ourAddr, h.ID(), heartbeat) if gov != nil { - gov.CollectMetrics(heartbeat) + gov.CollectMetrics(heartbeat, sendC, gk, ourAddr) } b, err := proto.Marshal(heartbeat) @@ -407,6 +407,10 @@ func Run(obsvC chan *gossipv1.SignedObservation, obsvReqC chan *gossipv1.Observa obsvReqC <- r } + case *gossipv1.GossipMessage_SignedChainGovernorConfig: + logger.Debug("cgov: received config message") + case *gossipv1.GossipMessage_SignedChainGovernorStatus: + logger.Debug("cgov: received status message") default: p2pMessagesReceived.WithLabelValues("unknown").Inc() logger.Warn("received unknown message type (running outdated software?)", diff --git a/proto/gossip/v1/gossip.proto b/proto/gossip/v1/gossip.proto index 6a281198e..1f0f6039e 100644 --- a/proto/gossip/v1/gossip.proto +++ b/proto/gossip/v1/gossip.proto @@ -12,6 +12,8 @@ message GossipMessage { SignedObservationRequest signed_observation_request = 5; SignedBatchObservation signed_batch_observation = 6; SignedBatchVAAWithQuorum signed_batch_vaa_with_quorum = 7; + SignedChainGovernorConfig signed_chain_governor_config = 8; + SignedChainGovernorStatus signed_chain_governor_status = 9; } } @@ -145,3 +147,73 @@ message SignedBatchObservation { message SignedBatchVAAWithQuorum { bytes batch_vaa = 1; } + +// This message is published every five minutes. +message SignedChainGovernorConfig { + // Serialized ChainGovernorConfig message. + bytes config = 1; + + // ECDSA signature using the node's guardian key. + bytes signature = 2; + + // Guardian address that signed this payload (truncated Eth address). + bytes guardian_addr = 3; +} + +message ChainGovernorConfig { + message Chain { + uint32 chain_id = 1; + uint64 notional_limit = 2; + uint64 big_transaction_size = 3; + } + + message Token { + uint32 origin_chain_id = 1; + string origin_address = 2; // human-readable hex-encoded (leading 0x) + float price = 3; + } + + string node_name = 1; + int64 counter = 2; + int64 timestamp = 3; + repeated Chain chains = 4; + repeated Token tokens = 5; +} + +// This message is published every minute. +message SignedChainGovernorStatus { + // Serialized ChainGovernorStatus message. + bytes status = 1; + + // ECDSA signature using the node's guardian key. + bytes signature = 2; + + // Guardian address that signed this payload (truncated Eth address). + bytes guardian_addr = 3; +} + +message ChainGovernorStatus { + message EnqueuedVAA { + uint64 sequence = 1; // Chain and emitter address are assumed. + uint32 release_time = 2; + uint64 notional_value = 3; + string tx_hash = 4; + } + + message Emitter { + string emitter_address = 1; // human-readable hex-encoded (leading 0x) + uint64 total_enqueued_vaas = 2; + repeated EnqueuedVAA enqueued_vaas = 3; // Only the first 20 will be included. + } + + message Chain { + uint32 chain_id = 1; + uint64 remaining_available_notional = 2; + repeated Emitter emitters = 3; + } + + string node_name = 1; + int64 counter = 2; + int64 timestamp = 3; + repeated Chain chains = 4; +}