More governor code review rework (#1404)

* More code review rework

Change-Id: I0ae094b16a8e0469f83f711e7118936abf70b109

* Fix lint error

Change-Id: Ie3bdcc61a8c475018a8f72c9cc84678779447d16

* Code review rework

Change-Id: Ief1119e7c9687db855fbee90d32f631630380e60

* Add features list to gossip heartbeat message
This commit is contained in:
bruce-riley 2022-08-09 22:22:14 -05:00 committed by GitHub
parent cc5ecb315e
commit 8557b6d232
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 197 additions and 125 deletions

View File

@ -132,6 +132,7 @@ spec:
- /tmp/admin.sock
- --dataDir
- /tmp/data
# - --chainGovernorEnabled=true
# - --logLevel=debug
securityContext:
capabilities:

View File

@ -13,6 +13,37 @@ import (
"go.uber.org/zap"
)
type GovernorDB interface {
StoreTransfer(t *Transfer) error
StorePendingMsg(k *common.MessagePublication) error
DeleteTransfer(t *Transfer) error
DeletePendingMsg(k *common.MessagePublication) error
GetChainGovernorData(logger *zap.Logger) (transfers []*Transfer, pending []*common.MessagePublication, err error)
}
type MockGovernorDB struct {
}
func (d *MockGovernorDB) StoreTransfer(t *Transfer) error {
return nil
}
func (d *MockGovernorDB) StorePendingMsg(k *common.MessagePublication) error {
return nil
}
func (d *MockGovernorDB) DeleteTransfer(t *Transfer) error {
return nil
}
func (d *MockGovernorDB) DeletePendingMsg(k *common.MessagePublication) error {
return nil
}
func (d *MockGovernorDB) GetChainGovernorData(logger *zap.Logger) (transfers []*Transfer, pending []*common.MessagePublication, err error) {
return nil, nil, nil
}
type Transfer struct {
Timestamp time.Time
Value uint64

View File

@ -7,9 +7,7 @@ import (
)
func (gov *ChainGovernor) initDevnetConfig() ([]tokenConfigEntry, []chainConfigEntry) {
if gov.logger != nil {
gov.logger.Info("cgov: setting up devnet config")
}
gov.logger.Info("cgov: setting up devnet config")
gov.dayLengthInMinutes = 5

View File

@ -85,7 +85,7 @@ type (
// Payload for each enqueued transfer
pendingEntry struct {
timeStamp time.Time
token *tokenEntry
token *tokenEntry // Store a reference to the token so we can get the current price to compute the value each interval.
amount *big.Int
msg *common.MessagePublication
}
@ -101,7 +101,7 @@ type (
)
type ChainGovernor struct {
db *db.Database
db db.GovernorDB
logger *zap.Logger
mutex sync.Mutex
tokens map[tokenKey]*tokenEntry
@ -115,7 +115,7 @@ type ChainGovernor struct {
func NewChainGovernor(
logger *zap.Logger,
db *db.Database,
db db.GovernorDB,
env int,
) *ChainGovernor {
return &ChainGovernor{
@ -129,9 +129,7 @@ func NewChainGovernor(
}
func (gov *ChainGovernor) Run(ctx context.Context) error {
if gov.logger != nil {
gov.logger.Info("cgov: starting chain governor")
}
gov.logger.Info("cgov: starting chain governor")
if err := gov.initConfig(); err != nil {
return err
@ -187,16 +185,14 @@ func (gov *ChainGovernor) initConfig() error {
te := &tokenEntry{cfgPrice: cfgPrice, price: initialPrice, decimals: decimals, symbol: ct.symbol, coinGeckoId: ct.coinGeckoId, token: key}
te.updatePrice()
if gov.logger != nil {
gov.logger.Info("cgov: will monitor token:", zap.Stringer("chain", key.chain),
zap.Stringer("addr", key.addr),
zap.String("symbol", te.symbol),
zap.String("coinGeckoId", te.coinGeckoId),
zap.String("price", te.price.String()),
zap.Int64("decimals", dec),
zap.Int64("origDecimals", ct.decimals),
)
}
gov.logger.Info("cgov: will monitor token:", zap.Stringer("chain", key.chain),
zap.Stringer("addr", key.addr),
zap.String("symbol", te.symbol),
zap.String("coinGeckoId", te.coinGeckoId),
zap.String("price", te.price.String()),
zap.Int64("decimals", dec),
zap.Int64("origDecimals", ct.decimals),
)
gov.tokens[key] = te
gov.tokensByCoinGeckoId[te.coinGeckoId] = te
@ -229,11 +225,9 @@ func (gov *ChainGovernor) initConfig() error {
ce := &chainEntry{emitterChainId: cc.emitterChainID, emitterAddr: emitterAddr, dailyLimit: cc.dailyLimit}
if gov.logger != nil {
gov.logger.Info("cgov: will monitor chain:", zap.Stringer("emitterChainId", cc.emitterChainID),
zap.Stringer("emitterAddr", ce.emitterAddr),
zap.String("dailyLimit", fmt.Sprint(ce.dailyLimit)))
}
gov.logger.Info("cgov: will monitor chain:", zap.Stringer("emitterChainId", cc.emitterChainID),
zap.Stringer("emitterAddr", ce.emitterAddr),
zap.String("dailyLimit", fmt.Sprint(ce.dailyLimit)))
gov.chains[cc.emitterChainID] = ce
}
@ -249,9 +243,7 @@ func (gov *ChainGovernor) initConfig() error {
func (gov *ChainGovernor) ProcessMsg(msg *common.MessagePublication) bool {
publish, err := gov.ProcessMsgForTime(msg, time.Now())
if err != nil {
if gov.logger != nil {
gov.logger.Error("cgov: failed to process VAA: %v", zap.Error(err))
}
gov.logger.Error("cgov: failed to process VAA: %v", zap.Error(err))
return false
}
@ -298,9 +290,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
startTime := now.Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))
prevTotalValue, err := ce.TrimAndSumValue(startTime, gov.db)
if err != nil {
if gov.logger != nil {
gov.logger.Error("cgov: failed to trim transfers", zap.Error(err))
}
gov.logger.Error("cgov: failed to trim transfers", zap.Error(err))
return false, err
}
@ -311,42 +301,37 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
}
newTotalValue := prevTotalValue + value
if newTotalValue < prevTotalValue {
return false, fmt.Errorf("total value has overflowed")
}
if newTotalValue > ce.dailyLimit {
if gov.logger != nil {
gov.logger.Error("cgov: enqueuing vaa because it would exceed the daily limit",
zap.Uint64("value", value),
zap.Uint64("prevTotalValue", prevTotalValue),
zap.Uint64("newTotalValue", newTotalValue),
zap.String("msgID", msg.MessageIDString()))
}
gov.logger.Error("cgov: enqueuing vaa because it would exceed the daily limit",
zap.Uint64("value", value),
zap.Uint64("prevTotalValue", prevTotalValue),
zap.Uint64("newTotalValue", newTotalValue),
zap.String("msgID", msg.MessageIDString()))
ce.pending = append(ce.pending, pendingEntry{timeStamp: now, token: token, amount: payload.Amount, msg: msg})
if gov.db != nil {
err = gov.db.StorePendingMsg(msg)
if err != nil {
return false, err
}
err = gov.db.StorePendingMsg(msg)
if err != nil {
return false, err
}
return false, nil
}
if gov.logger != nil {
gov.logger.Info("cgov: posting vaa",
zap.Uint64("value", value),
zap.Uint64("prevTotalValue", prevTotalValue),
zap.Uint64("newTotalValue", newTotalValue),
zap.String("msgID", msg.MessageIDString()))
}
gov.logger.Info("cgov: posting vaa",
zap.Uint64("value", value),
zap.Uint64("prevTotalValue", prevTotalValue),
zap.Uint64("newTotalValue", newTotalValue),
zap.String("msgID", msg.MessageIDString()))
xfer := db.Transfer{Timestamp: now, Value: value, OriginChain: token.token.chain, OriginAddress: token.token.addr, EmitterChain: msg.EmitterChain, EmitterAddress: msg.EmitterAddress, MsgID: msg.MessageIDString()}
ce.transfers = append(ce.transfers, xfer)
if gov.db != nil {
err = gov.db.StoreTransfer(&xfer)
if err != nil {
return false, err
}
err = gov.db.StoreTransfer(&xfer)
if err != nil {
return false, err
}
return true, nil
@ -360,6 +345,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
gov.mutex.Lock()
defer gov.mutex.Unlock()
// Note: Using Add() with a negative value because Sub() takes a time and returns a duration, which is not what we want.
startTime := now.Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))
var msgsToPublish []*common.MessagePublication
@ -375,61 +361,59 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
foundOne := false
prevTotalValue, err := ce.TrimAndSumValue(startTime, gov.db)
if err != nil {
if gov.logger != nil {
gov.logger.Error("cgov: failed to trim transfers", zap.Error(err))
}
return msgsToPublish, err
gov.logger.Error("cgov: failed to trim transfers", zap.Error(err))
gov.msgsToPublish = msgsToPublish
return nil, err
}
// Keep going until we find something that fits or hit the end.
for idx, pe := range ce.pending {
value, err := computeValue(pe.amount, pe.token)
if err != nil {
if gov.logger != nil {
gov.logger.Error("cgov: failed to compute value for pending vaa",
zap.Stringer("amount", pe.amount),
zap.Stringer("price", pe.token.price),
zap.String("msgID", pe.msg.MessageIDString()),
zap.Error(err),
)
}
gov.logger.Error("cgov: failed to compute value for pending vaa",
zap.Stringer("amount", pe.amount),
zap.Stringer("price", pe.token.price),
zap.String("msgID", pe.msg.MessageIDString()),
zap.Error(err),
)
return msgsToPublish, err
gov.msgsToPublish = msgsToPublish
return nil, err
}
newTotalValue := prevTotalValue + value
if newTotalValue < prevTotalValue {
gov.msgsToPublish = msgsToPublish
return nil, fmt.Errorf("total value has overflowed")
}
if newTotalValue > ce.dailyLimit {
// This one won't fit. Keep checking other enqueued ones.
continue
}
// If we get here, we found something that fits. Publish it and remove it from the pending list.
if gov.logger != nil {
gov.logger.Info("cgov: posting pending vaa",
zap.Stringer("amount", pe.amount),
zap.Stringer("price", pe.token.price),
zap.Uint64("value", value),
zap.Uint64("prevTotalValue", prevTotalValue),
zap.Uint64("newTotalValue", newTotalValue),
zap.String("msgID", pe.msg.MessageIDString()))
}
gov.logger.Info("cgov: posting pending vaa",
zap.Stringer("amount", pe.amount),
zap.Stringer("price", pe.token.price),
zap.Uint64("value", value),
zap.Uint64("prevTotalValue", prevTotalValue),
zap.Uint64("newTotalValue", newTotalValue),
zap.String("msgID", pe.msg.MessageIDString()))
msgsToPublish = append(msgsToPublish, pe.msg)
xfer := db.Transfer{Timestamp: now, Value: value, OriginChain: pe.token.token.chain, OriginAddress: pe.token.token.addr, EmitterChain: pe.msg.EmitterChain, EmitterAddress: pe.msg.EmitterAddress, MsgID: pe.msg.MessageIDString()}
ce.transfers = append(ce.transfers, xfer)
if gov.db != nil {
if err := gov.db.StoreTransfer(&xfer); err != nil {
return msgsToPublish, err
}
if err := gov.db.StoreTransfer(&xfer); err != nil {
gov.msgsToPublish = msgsToPublish
return nil, err
}
if gov.db != nil {
if err := gov.db.DeletePendingMsg(pe.msg); err != nil {
return msgsToPublish, err
}
if err := gov.db.DeletePendingMsg(pe.msg); err != nil {
gov.msgsToPublish = msgsToPublish
return nil, err
}
ce.pending = append(ce.pending[:idx], ce.pending[idx+1:]...)
@ -465,12 +449,12 @@ func computeValue(amount *big.Int, token *tokenEntry) (uint64, error) {
return value, nil
}
func (ce *chainEntry) TrimAndSumValue(startTime time.Time, db *db.Database) (sum uint64, err error) {
func (ce *chainEntry) TrimAndSumValue(startTime time.Time, db db.GovernorDB) (sum uint64, err error) {
sum, ce.transfers, err = TrimAndSumValue(ce.transfers, startTime, db)
return sum, err
}
func TrimAndSumValue(transfers []db.Transfer, startTime time.Time, db *db.Database) (uint64, []db.Transfer, error) {
func TrimAndSumValue(transfers []db.Transfer, startTime time.Time, db db.GovernorDB) (uint64, []db.Transfer, error) {
if len(transfers) == 0 {
return 0, transfers, nil
}

View File

@ -142,10 +142,8 @@ func (gov *ChainGovernor) DropPendingVAA(vaaId string) (string, error) {
zap.Stringer("timeStamp", pe.timeStamp),
)
if gov.db != nil {
if err := gov.db.DeletePendingMsg(pe.msg); err != nil {
return "", err
}
if err := gov.db.DeletePendingMsg(pe.msg); err != nil {
return "", err
}
ce.pending = append(ce.pending[:idx], ce.pending[idx+1:]...)
@ -176,13 +174,11 @@ func (gov *ChainGovernor) ReleasePendingVAA(vaaId string) (string, error) {
gov.msgsToPublish = append(gov.msgsToPublish, pe.msg)
if gov.db != nil {
// We delete the pending message from the database, but we don't add it to the transfers
// because released messages do not apply to the limit.
// We delete the pending message from the database, but we don't add it to the transfers
// because released messages do not apply to the limit.
if err := gov.db.DeletePendingMsg(pe.msg); err != nil {
return "", err
}
if err := gov.db.DeletePendingMsg(pe.msg); err != nil {
return "", err
}
ce.pending = append(ce.pending[:idx], ce.pending[idx+1:]...)

View File

@ -2,7 +2,7 @@
//
// The initial prices are read from the static config (tokens.go). After that, prices are
// queried from CoinGecko. The chain governor then uses the maximum of the static price and
// the latest CoinGecko price.
// the latest CoinGecko price. The CoinGecko poll interval is specified by coinGeckoQueryIntervalInMins.
package governor
@ -16,10 +16,14 @@ import (
"time"
"go.uber.org/zap"
"github.com/certusone/wormhole/node/pkg/supervisor"
)
// An example of the query to be generated: https://api.coingecko.com/api/v3/simple/price?ids=gemma-extending-tech,bitcoin,weth&vs_currencies=usd
const coinGeckoQueryIntervalInMins = 5
func (gov *ChainGovernor) initCoinGecko(ctx context.Context) error {
ids := ""
first := true
@ -38,40 +42,43 @@ func (gov *ChainGovernor) initCoinGecko(ctx context.Context) error {
params.Add("vs_currencies", "usd")
if first {
if gov.logger != nil {
gov.logger.Info("cgov: did not find any securities, nothing to do!")
}
gov.logger.Info("cgov: did not find any tokens, nothing to do!")
return nil
}
gov.coinGeckoQuery = "https://api.coingecko.com/api/v3/simple/price?" + params.Encode()
gov.logger.Info("cgov: coingecko query: ", zap.String("query", gov.coinGeckoQuery))
if gov.logger != nil {
gov.logger.Info("cgov: coingecko query: ", zap.String("query", gov.coinGeckoQuery))
if err := supervisor.Run(ctx, "govpricer", gov.PriceQuery); err != nil {
return err
}
timer := time.NewTimer(time.Millisecond) // Start immediately.
go func() {
for {
select {
case <-ctx.Done():
return
case <-timer.C:
gov.queryCoinGecko()
timer = time.NewTimer(time.Duration(5) * time.Minute)
}
}
}()
return nil
}
func (gov *ChainGovernor) PriceQuery(ctx context.Context) error {
// Do a query immediately, then once each interval.
gov.queryCoinGecko()
ticker := time.NewTicker(time.Duration(coinGeckoQueryIntervalInMins) * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
gov.queryCoinGecko()
}
}
}
// This does not return an error. Instead, it just logs the error and we will try again five minutes later.
func (gov *ChainGovernor) queryCoinGecko() {
response, err := http.Get(gov.coinGeckoQuery)
if err != nil {
gov.logger.Error("cgov: failed to query coin gecko", zap.String("query", gov.coinGeckoQuery), zap.Error(err))
gov.logger.Error("cgov: failed to query coin gecko, reverting to configured prices", zap.String("query", gov.coinGeckoQuery), zap.Error(err))
gov.revertAllPrices()
return
}
@ -79,18 +86,22 @@ func (gov *ChainGovernor) queryCoinGecko() {
err = response.Body.Close()
if err != nil {
gov.logger.Error("cgov: failed to close coin gecko query")
// We can't safely call revertAllPrices() here because we don't know if we hold the lock or not.
// Also, we don't need to because the prices have already been updated / reverted by this point.
}
}()
responseData, err := ioutil.ReadAll(response.Body)
if err != nil {
gov.logger.Error("cgov: failed to parse coin gecko response", zap.Error(err))
gov.logger.Error("cgov: failed to parse coin gecko response, reverting to configured prices", zap.Error(err))
gov.revertAllPrices()
return
}
var result map[string]interface{}
if err := json.Unmarshal(responseData, &result); err != nil {
gov.logger.Error("cgov: failed to unmarshal coin gecko json", zap.Error(err))
gov.logger.Error("cgov: failed to unmarshal coin gecko json, reverting to configured prices", zap.Error(err))
gov.revertAllPrices()
return
}
@ -98,12 +109,18 @@ func (gov *ChainGovernor) queryCoinGecko() {
gov.mutex.Lock()
defer gov.mutex.Unlock()
localTokenMap := make(map[string]*tokenEntry)
for coinGeckoId, te := range gov.tokensByCoinGeckoId {
localTokenMap[coinGeckoId] = te
}
for coinGeckoId, data := range result {
te, exists := gov.tokensByCoinGeckoId[coinGeckoId]
if exists {
price, ok := data.(map[string]interface{})["usd"].(float64)
if !ok {
gov.logger.Error("cgov: failed to parse coin gecko response", zap.String("coinGeckoId", coinGeckoId))
gov.logger.Error("cgov: failed to parse coin gecko response, reverting to configured price for this token", zap.String("coinGeckoId", coinGeckoId))
// By continuing, we leave this one in the local map so the price will get reverted below.
continue
}
te.coinGeckoPrice = big.NewFloat(price)
@ -118,8 +135,43 @@ func (gov *ChainGovernor) queryCoinGecko() {
zap.Stringer("cfgPrice", te.cfgPrice),
zap.Stringer("coinGeckoPrice", te.coinGeckoPrice),
)
delete(localTokenMap, coinGeckoId)
} else {
gov.logger.Error("cgov: received a CoinGecko response for an unexpected symbol", zap.String("coinGeckoId", coinGeckoId))
}
}
if len(localTokenMap) != 0 {
for _, te := range localTokenMap {
gov.logger.Error("cgov: did not receive a CoinGecko response for symbol, reverting to configured price",
zap.String("symbol", te.symbol),
zap.String("coinGeckoId",
te.coinGeckoId),
zap.Stringer("cfgPrice", te.cfgPrice),
)
te.price = te.cfgPrice
// Don't update the timestamp so we'll know when we last received an update from CoinGecko.
}
}
}
func (gov *ChainGovernor) revertAllPrices() {
gov.mutex.Lock()
defer gov.mutex.Unlock()
for _, te := range gov.tokensByCoinGeckoId {
gov.logger.Error("cgov: reverting to configured price",
zap.String("symbol", te.symbol),
zap.String("coinGeckoId",
te.coinGeckoId),
zap.Stringer("cfgPrice", te.cfgPrice),
)
te.price = te.cfgPrice
// Don't update the timestamp so we'll know when we last received an update from CoinGecko.
}
}
// We should use the max(coinGeckoPrice, configuredPrice) as our price for computing notional value.

View File

@ -16,6 +16,7 @@ import (
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/vaa"
"go.uber.org/zap"
)
// This is so we can have consistent config data for unit tests.
@ -211,7 +212,9 @@ func newChainGovernorForTest(ctx context.Context) (*ChainGovernor, error) {
return nil, fmt.Errorf("ctx is nil")
}
gov := NewChainGovernor(nil, nil, GoTestMode)
logger := zap.NewNop()
var db db.MockGovernorDB
gov := NewChainGovernor(logger, &db, GoTestMode)
err := gov.Run(ctx)
if err != nil {

View File

@ -7,9 +7,7 @@ import (
)
func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []chainConfigEntry) {
if gov.logger != nil {
gov.logger.Info("cgov: setting up testnet config")
}
gov.logger.Info("cgov: setting up testnet config")
tokens := []tokenConfigEntry{
tokenConfigEntry{chain: 1, addr: "069b8857feab8184fb687f634618c035dac439dc1aeb3b5598a0f00000000001", symbol: "SOL", coinGeckoId: "wrapped-solana", decimals: 8, price: 34.94}, // Addr: So11111111111111111111111111111111111111112, Notional: 4145006

View File

@ -223,6 +223,11 @@ func Run(obsvC chan *gossipv1.SignedObservation, obsvReqC chan *gossipv1.Observa
networks = append(networks, v)
}
features := make([]string, 0)
if gov != nil {
features = append(features, "governor")
}
heartbeat := &gossipv1.Heartbeat{
NodeName: nodeName,
Counter: ctr,
@ -231,6 +236,7 @@ func Run(obsvC chan *gossipv1.SignedObservation, obsvReqC chan *gossipv1.Observa
Version: version.Version(),
GuardianAddr: DefaultRegistry.guardianAddress,
BootTimestamp: bootTime.UnixNano(),
Features: features,
}
ourAddr := ethcrypto.PubkeyToAddress(gk.PublicKey)

View File

@ -55,6 +55,9 @@ message Heartbeat {
// UNIX boot timestamp.
int64 boot_timestamp = 7;
// List of features enabled on this node.
repeated string features = 8;
}
// A SignedObservation is a signed statement by a given guardian node