Node/Gov: Use component logging (#2667)

This commit is contained in:
bruce-riley 2023-04-18 11:38:24 -05:00 committed by GitHub
parent 90dce6161b
commit 240230bd36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 66 additions and 70 deletions

View File

@ -351,7 +351,7 @@ func (d *Database) GetChainGovernorDataForTime(logger *zap.Logger, now time.Time
if len(oldPendingToUpdate) != 0 { if len(oldPendingToUpdate) != 0 {
for _, pending := range oldPendingToUpdate { for _, pending := range oldPendingToUpdate {
logger.Info("cgov: updating format of database entry for pending vaa", zap.String("msgId", pending.Msg.MessageIDString())) logger.Info("updating format of database entry for pending vaa", zap.String("msgId", pending.Msg.MessageIDString()))
err := d.StorePendingMsg(pending) err := d.StorePendingMsg(pending)
if err != nil { if err != nil {
return fmt.Errorf("failed to write new pending msg for key [%v]: %w", pending.Msg.MessageIDString(), err) return fmt.Errorf("failed to write new pending msg for key [%v]: %w", pending.Msg.MessageIDString(), err)
@ -369,7 +369,7 @@ func (d *Database) GetChainGovernorDataForTime(logger *zap.Logger, now time.Time
if len(oldTransfers) != 0 { if len(oldTransfers) != 0 {
for _, xfer := range oldTransfers { for _, xfer := range oldTransfers {
logger.Info("cgov: updating format of database entry for completed transfer", zap.String("msgId", xfer.MsgID)) logger.Info("updating format of database entry for completed transfer", zap.String("msgId", xfer.MsgID))
err := d.StoreTransfer(xfer) err := d.StoreTransfer(xfer)
if err != nil { if err != nil {
return fmt.Errorf("failed to write new completed transfer for key [%v]: %w", xfer.MsgID, err) return fmt.Errorf("failed to write new completed transfer for key [%v]: %w", xfer.MsgID, err)

View File

@ -7,7 +7,7 @@ import (
) )
func (gov *ChainGovernor) initDevnetConfig() ([]tokenConfigEntry, []chainConfigEntry) { func (gov *ChainGovernor) initDevnetConfig() ([]tokenConfigEntry, []chainConfigEntry) {
gov.logger.Info("cgov: setting up devnet config") gov.logger.Info("setting up devnet config")
gov.dayLengthInMinutes = 5 gov.dayLengthInMinutes = 5

View File

@ -141,7 +141,7 @@ func NewChainGovernor(
) *ChainGovernor { ) *ChainGovernor {
return &ChainGovernor{ return &ChainGovernor{
db: db, db: db,
logger: logger, logger: logger.With(zap.String("component", "cgov")),
tokens: make(map[tokenKey]*tokenEntry), tokens: make(map[tokenKey]*tokenEntry),
tokensByCoinGeckoId: make(map[string][]*tokenEntry), tokensByCoinGeckoId: make(map[string][]*tokenEntry),
chains: make(map[vaa.ChainID]*chainEntry), chains: make(map[vaa.ChainID]*chainEntry),
@ -151,7 +151,7 @@ func NewChainGovernor(
} }
func (gov *ChainGovernor) Run(ctx context.Context) error { func (gov *ChainGovernor) Run(ctx context.Context) error {
gov.logger.Info("cgov: starting chain governor") gov.logger.Info("starting chain governor")
if err := gov.initConfig(); err != nil { if err := gov.initConfig(); err != nil {
return err return err
@ -224,7 +224,7 @@ func (gov *ChainGovernor) initConfig() error {
gov.tokensByCoinGeckoId[te.coinGeckoId] = cge gov.tokensByCoinGeckoId[te.coinGeckoId] = cge
} }
gov.logger.Info("cgov: will monitor token:", zap.Stringer("chain", key.chain), gov.logger.Info("will monitor token:", zap.Stringer("chain", key.chain),
zap.Stringer("addr", key.addr), zap.Stringer("addr", key.addr),
zap.String("symbol", te.symbol), zap.String("symbol", te.symbol),
zap.String("coinGeckoId", te.coinGeckoId), zap.String("coinGeckoId", te.coinGeckoId),
@ -267,7 +267,7 @@ func (gov *ChainGovernor) initConfig() error {
checkForBigTransactions: cc.bigTransactionSize != 0, checkForBigTransactions: cc.bigTransactionSize != 0,
} }
gov.logger.Info("cgov: will monitor chain:", zap.Stringer("emitterChainId", cc.emitterChainID), gov.logger.Info("will monitor chain:", zap.Stringer("emitterChainId", cc.emitterChainID),
zap.Stringer("emitterAddr", ce.emitterAddr), zap.Stringer("emitterAddr", ce.emitterAddr),
zap.String("dailyLimit", fmt.Sprint(ce.dailyLimit)), zap.String("dailyLimit", fmt.Sprint(ce.dailyLimit)),
zap.Uint64("bigTransactionSize", ce.bigTransactionSize), zap.Uint64("bigTransactionSize", ce.bigTransactionSize),
@ -288,7 +288,7 @@ func (gov *ChainGovernor) initConfig() error {
func (gov *ChainGovernor) ProcessMsg(msg *common.MessagePublication) bool { func (gov *ChainGovernor) ProcessMsg(msg *common.MessagePublication) bool {
publish, err := gov.ProcessMsgForTime(msg, time.Now()) publish, err := gov.ProcessMsgForTime(msg, time.Now())
if err != nil { if err != nil {
gov.logger.Error("cgov: failed to process VAA: %v", zap.Error(err)) gov.logger.Error("failed to process VAA: %v", zap.Error(err))
return false return false
} }
@ -316,7 +316,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
xferComplete, alreadySeen := gov.msgsSeen[hash] xferComplete, alreadySeen := gov.msgsSeen[hash]
if alreadySeen { if alreadySeen {
if !xferComplete { if !xferComplete {
gov.logger.Info("cgov: ignoring duplicate vaa because it is enqueued", gov.logger.Info("ignoring duplicate vaa because it is enqueued",
zap.String("msgID", msg.MessageIDString()), zap.String("msgID", msg.MessageIDString()),
zap.String("hash", hash), zap.String("hash", hash),
zap.Stringer("txHash", msg.TxHash), zap.Stringer("txHash", msg.TxHash),
@ -324,7 +324,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
return false, nil return false, nil
} }
gov.logger.Info("cgov: allowing duplicate vaa to be published again, but not adding it to the notional value", gov.logger.Info("allowing duplicate vaa to be published again, but not adding it to the notional value",
zap.String("msgID", msg.MessageIDString()), zap.String("msgID", msg.MessageIDString()),
zap.String("hash", hash), zap.String("hash", hash),
zap.Stringer("txHash", msg.TxHash), zap.Stringer("txHash", msg.TxHash),
@ -335,7 +335,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
startTime := now.Add(-time.Minute * time.Duration(gov.dayLengthInMinutes)) startTime := now.Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))
prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime) prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime)
if err != nil { if err != nil {
gov.logger.Error("cgov: failed to trim transfers", gov.logger.Error("failed to trim transfers",
zap.String("msgID", msg.MessageIDString()), zap.String("msgID", msg.MessageIDString()),
zap.String("hash", hash), zap.String("hash", hash),
zap.Stringer("txHash", msg.TxHash), zap.Stringer("txHash", msg.TxHash),
@ -346,7 +346,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
value, err := computeValue(payload.Amount, token) value, err := computeValue(payload.Amount, token)
if err != nil { if err != nil {
gov.logger.Error("cgov: failed to compute value of transfer", gov.logger.Error("failed to compute value of transfer",
zap.String("msgID", msg.MessageIDString()), zap.String("msgID", msg.MessageIDString()),
zap.String("hash", hash), zap.String("hash", hash),
zap.Stringer("txHash", msg.TxHash), zap.Stringer("txHash", msg.TxHash),
@ -357,7 +357,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
newTotalValue := prevTotalValue + value newTotalValue := prevTotalValue + value
if newTotalValue < prevTotalValue { if newTotalValue < prevTotalValue {
gov.logger.Error("cgov: total value has overflowed", gov.logger.Error("total value has overflowed",
zap.String("msgID", msg.MessageIDString()), zap.String("msgID", msg.MessageIDString()),
zap.String("hash", hash), zap.String("hash", hash),
zap.Stringer("txHash", msg.TxHash), zap.Stringer("txHash", msg.TxHash),
@ -372,7 +372,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
if ce.isBigTransfer(value) { if ce.isBigTransfer(value) {
enqueueIt = true enqueueIt = true
releaseTime = now.Add(maxEnqueuedTime) releaseTime = now.Add(maxEnqueuedTime)
gov.logger.Error("cgov: enqueuing vaa because it is a big transaction", gov.logger.Error("enqueuing vaa because it is a big transaction",
zap.Uint64("value", value), zap.Uint64("value", value),
zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("prevTotalValue", prevTotalValue),
zap.Uint64("newTotalValue", newTotalValue), zap.Uint64("newTotalValue", newTotalValue),
@ -385,7 +385,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
} else if newTotalValue > ce.dailyLimit { } else if newTotalValue > ce.dailyLimit {
enqueueIt = true enqueueIt = true
releaseTime = now.Add(maxEnqueuedTime) releaseTime = now.Add(maxEnqueuedTime)
gov.logger.Error("cgov: enqueuing vaa because it would exceed the daily limit", gov.logger.Error("enqueuing vaa because it would exceed the daily limit",
zap.Uint64("value", value), zap.Uint64("value", value),
zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("prevTotalValue", prevTotalValue),
zap.Uint64("newTotalValue", newTotalValue), zap.Uint64("newTotalValue", newTotalValue),
@ -400,7 +400,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
dbData := db.PendingTransfer{ReleaseTime: releaseTime, Msg: *msg} dbData := db.PendingTransfer{ReleaseTime: releaseTime, Msg: *msg}
err = gov.db.StorePendingMsg(&dbData) err = gov.db.StorePendingMsg(&dbData)
if err != nil { if err != nil {
gov.logger.Error("cgov: failed to store pending vaa", gov.logger.Error("failed to store pending vaa",
zap.String("msgID", msg.MessageIDString()), zap.String("msgID", msg.MessageIDString()),
zap.String("hash", hash), zap.String("hash", hash),
zap.Stringer("txHash", msg.TxHash), zap.Stringer("txHash", msg.TxHash),
@ -414,7 +414,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
return false, nil return false, nil
} }
gov.logger.Info("cgov: posting vaa", gov.logger.Info("posting vaa",
zap.Uint64("value", value), zap.Uint64("value", value),
zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("prevTotalValue", prevTotalValue),
zap.Uint64("newTotalValue", newTotalValue), zap.Uint64("newTotalValue", newTotalValue),
@ -434,7 +434,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
} }
err = gov.db.StoreTransfer(&xfer) err = gov.db.StoreTransfer(&xfer)
if err != nil { if err != nil {
gov.logger.Error("cgov: failed to store transfer", gov.logger.Error("failed to store transfer",
zap.String("msgID", msg.MessageIDString()), zap.String("msgID", msg.MessageIDString()),
zap.String("hash", hash), zap.Error(err), zap.String("hash", hash), zap.Error(err),
zap.Stringer("txHash", msg.TxHash), zap.Stringer("txHash", msg.TxHash),
@ -461,26 +461,26 @@ func (gov *ChainGovernor) parseMsgAlreadyLocked(msg *common.MessagePublication)
ce, exists := gov.chains[msg.EmitterChain] ce, exists := gov.chains[msg.EmitterChain]
if !exists { if !exists {
if msg.EmitterChain != vaa.ChainIDPythNet { if msg.EmitterChain != vaa.ChainIDPythNet {
gov.logger.Info("cgov: ignoring vaa because the emitter chain is not configured", zap.String("msgID", msg.MessageIDString())) gov.logger.Info("ignoring vaa because the emitter chain is not configured", zap.String("msgID", msg.MessageIDString()))
} }
return false, nil, nil, nil, nil return false, nil, nil, nil, nil
} }
// If we don't care about this emitter, the VAA can be published. // If we don't care about this emitter, the VAA can be published.
if msg.EmitterAddress != ce.emitterAddr { if msg.EmitterAddress != ce.emitterAddr {
gov.logger.Info("cgov: ignoring vaa because the emitter address is not configured", zap.String("msgID", msg.MessageIDString())) gov.logger.Info("ignoring vaa because the emitter address is not configured", zap.String("msgID", msg.MessageIDString()))
return false, nil, nil, nil, nil return false, nil, nil, nil, nil
} }
// We only care about transfers. // We only care about transfers.
if !vaa.IsTransfer(msg.Payload) { if !vaa.IsTransfer(msg.Payload) {
gov.logger.Info("cgov: ignoring vaa because it is not a transfer", zap.String("msgID", msg.MessageIDString())) gov.logger.Info("ignoring vaa because it is not a transfer", zap.String("msgID", msg.MessageIDString()))
return false, nil, nil, nil, nil return false, nil, nil, nil, nil
} }
payload, err := vaa.DecodeTransferPayloadHdr(msg.Payload) payload, err := vaa.DecodeTransferPayloadHdr(msg.Payload)
if err != nil { if err != nil {
gov.logger.Error("cgov: failed to decode vaa", zap.String("msgID", msg.MessageIDString()), zap.Error(err)) gov.logger.Error("failed to decode vaa", zap.String("msgID", msg.MessageIDString()), zap.Error(err))
return false, nil, nil, nil, err return false, nil, nil, nil, err
} }
@ -488,7 +488,7 @@ func (gov *ChainGovernor) parseMsgAlreadyLocked(msg *common.MessagePublication)
tk := tokenKey{chain: payload.OriginChain, addr: payload.OriginAddress} tk := tokenKey{chain: payload.OriginChain, addr: payload.OriginAddress}
token, exists := gov.tokens[tk] token, exists := gov.tokens[tk]
if !exists { if !exists {
gov.logger.Info("cgov: ignoring vaa because the token is not in the list", zap.String("msgID", msg.MessageIDString())) gov.logger.Info("ignoring vaa because the token is not in the list", zap.String("msgID", msg.MessageIDString()))
return false, nil, nil, nil, nil return false, nil, nil, nil, nil
} }
@ -508,7 +508,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
var msgsToPublish []*common.MessagePublication var msgsToPublish []*common.MessagePublication
if len(gov.msgsToPublish) != 0 { if len(gov.msgsToPublish) != 0 {
gov.logger.Info("cgov: posting released vaas", zap.Int("num", len(gov.msgsToPublish))) gov.logger.Info("posting released vaas", zap.Int("num", len(gov.msgsToPublish)))
msgsToPublish = gov.msgsToPublish msgsToPublish = gov.msgsToPublish
gov.msgsToPublish = nil gov.msgsToPublish = nil
} }
@ -519,7 +519,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
foundOne := false foundOne := false
prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime) prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime)
if err != nil { if err != nil {
gov.logger.Error("cgov: failed to trim transfers", zap.Error(err)) gov.logger.Error("failed to trim transfers", zap.Error(err))
gov.msgsToPublish = msgsToPublish gov.msgsToPublish = msgsToPublish
return nil, err return nil, err
} }
@ -528,7 +528,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
for idx, pe := range ce.pending { for idx, pe := range ce.pending {
value, err := computeValue(pe.amount, pe.token) value, err := computeValue(pe.amount, pe.token)
if err != nil { if err != nil {
gov.logger.Error("cgov: failed to compute value for pending vaa", gov.logger.Error("failed to compute value for pending vaa",
zap.Stringer("amount", pe.amount), zap.Stringer("amount", pe.amount),
zap.Stringer("price", pe.token.price), zap.Stringer("price", pe.token.price),
zap.String("msgID", pe.dbData.Msg.MessageIDString()), zap.String("msgID", pe.dbData.Msg.MessageIDString()),
@ -546,7 +546,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
} }
countsTowardsTransfers = false countsTowardsTransfers = false
gov.logger.Info("cgov: posting pending big vaa because the release time has been reached", gov.logger.Info("posting pending big vaa because the release time has been reached",
zap.Stringer("amount", pe.amount), zap.Stringer("amount", pe.amount),
zap.Stringer("price", pe.token.price), zap.Stringer("price", pe.token.price),
zap.Uint64("value", value), zap.Uint64("value", value),
@ -554,7 +554,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
zap.String("msgID", pe.dbData.Msg.MessageIDString())) zap.String("msgID", pe.dbData.Msg.MessageIDString()))
} else if now.After(pe.dbData.ReleaseTime) { } else if now.After(pe.dbData.ReleaseTime) {
countsTowardsTransfers = false countsTowardsTransfers = false
gov.logger.Info("cgov: posting pending vaa because the release time has been reached", gov.logger.Info("posting pending vaa because the release time has been reached",
zap.Stringer("amount", pe.amount), zap.Stringer("amount", pe.amount),
zap.Stringer("price", pe.token.price), zap.Stringer("price", pe.token.price),
zap.Uint64("value", value), zap.Uint64("value", value),
@ -572,7 +572,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
continue continue
} }
gov.logger.Info("cgov: posting pending vaa", gov.logger.Info("posting pending vaa",
zap.Stringer("amount", pe.amount), zap.Stringer("amount", pe.amount),
zap.Stringer("price", pe.token.price), zap.Stringer("price", pe.token.price),
zap.Uint64("value", value), zap.Uint64("value", value),

View File

@ -21,7 +21,7 @@ func (gov *ChainGovernor) loadFromDB() error {
func (gov *ChainGovernor) loadFromDBAlreadyLocked() error { func (gov *ChainGovernor) loadFromDBAlreadyLocked() error {
xfers, pending, err := gov.db.GetChainGovernorData(gov.logger) xfers, pending, err := gov.db.GetChainGovernorData(gov.logger)
if err != nil { if err != nil {
gov.logger.Error("cgov: failed to reload transactions from db", zap.Error(err)) gov.logger.Error("failed to reload transactions from db", zap.Error(err))
return err return err
} }
@ -60,7 +60,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
msg := &pending.Msg msg := &pending.Msg
ce, exists := gov.chains[msg.EmitterChain] ce, exists := gov.chains[msg.EmitterChain]
if !exists { if !exists {
gov.logger.Error("cgov: reloaded pending transfer for unsupported chain, dropping it", gov.logger.Error("reloaded pending transfer for unsupported chain, dropping it",
zap.String("MsgID", msg.MessageIDString()), zap.String("MsgID", msg.MessageIDString()),
zap.Stringer("TxHash", msg.TxHash), zap.Stringer("TxHash", msg.TxHash),
zap.Stringer("Timestamp", msg.Timestamp), zap.Stringer("Timestamp", msg.Timestamp),
@ -74,7 +74,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
} }
if msg.EmitterAddress != ce.emitterAddr { if msg.EmitterAddress != ce.emitterAddr {
gov.logger.Error("cgov: reloaded pending transfer for unsupported emitter address, dropping it", gov.logger.Error("reloaded pending transfer for unsupported emitter address, dropping it",
zap.String("MsgID", msg.MessageIDString()), zap.String("MsgID", msg.MessageIDString()),
zap.Stringer("TxHash", msg.TxHash), zap.Stringer("TxHash", msg.TxHash),
zap.Stringer("Timestamp", msg.Timestamp), zap.Stringer("Timestamp", msg.Timestamp),
@ -89,7 +89,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
payload, err := vaa.DecodeTransferPayloadHdr(msg.Payload) payload, err := vaa.DecodeTransferPayloadHdr(msg.Payload)
if err != nil { if err != nil {
gov.logger.Error("cgov: failed to parse payload for reloaded pending transfer, dropping it", gov.logger.Error("failed to parse payload for reloaded pending transfer, dropping it",
zap.String("MsgID", msg.MessageIDString()), zap.String("MsgID", msg.MessageIDString()),
zap.Stringer("TxHash", msg.TxHash), zap.Stringer("TxHash", msg.TxHash),
zap.Stringer("Timestamp", msg.Timestamp), zap.Stringer("Timestamp", msg.Timestamp),
@ -108,7 +108,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
tk := tokenKey{chain: payload.OriginChain, addr: payload.OriginAddress} tk := tokenKey{chain: payload.OriginChain, addr: payload.OriginAddress}
token, exists := gov.tokens[tk] token, exists := gov.tokens[tk]
if !exists { if !exists {
gov.logger.Error("cgov: reloaded pending transfer for unsupported token, dropping it", gov.logger.Error("reloaded pending transfer for unsupported token, dropping it",
zap.String("MsgID", msg.MessageIDString()), zap.String("MsgID", msg.MessageIDString()),
zap.Stringer("TxHash", msg.TxHash), zap.Stringer("TxHash", msg.TxHash),
zap.Stringer("Timestamp", msg.Timestamp), zap.Stringer("Timestamp", msg.Timestamp),
@ -126,7 +126,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
hash := gov.HashFromMsg(msg) hash := gov.HashFromMsg(msg)
if _, alreadyExists := gov.msgsSeen[hash]; alreadyExists { if _, alreadyExists := gov.msgsSeen[hash]; alreadyExists {
gov.logger.Error("cgov: not reloading pending transfer because it is a duplicate", gov.logger.Error("not reloading pending transfer because it is a duplicate",
zap.String("MsgID", msg.MessageIDString()), zap.String("MsgID", msg.MessageIDString()),
zap.Stringer("TxHash", msg.TxHash), zap.Stringer("TxHash", msg.TxHash),
zap.Stringer("Timestamp", msg.Timestamp), zap.Stringer("Timestamp", msg.Timestamp),
@ -141,7 +141,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
return return
} }
gov.logger.Info("cgov: reloaded pending transfer", gov.logger.Info("reloaded pending transfer",
zap.String("MsgID", msg.MessageIDString()), zap.String("MsgID", msg.MessageIDString()),
zap.Stringer("TxHash", msg.TxHash), zap.Stringer("TxHash", msg.TxHash),
zap.Stringer("Timestamp", msg.Timestamp), zap.Stringer("Timestamp", msg.Timestamp),
@ -161,7 +161,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer, now time.Time, startTime time.Time) { func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer, now time.Time, startTime time.Time) {
ce, exists := gov.chains[xfer.EmitterChain] ce, exists := gov.chains[xfer.EmitterChain]
if !exists { if !exists {
gov.logger.Error("cgov: reloaded transfer for unsupported chain, dropping it", gov.logger.Error("reloaded transfer for unsupported chain, dropping it",
zap.Stringer("Timestamp", xfer.Timestamp), zap.Stringer("Timestamp", xfer.Timestamp),
zap.Uint64("Value", xfer.Value), zap.Uint64("Value", xfer.Value),
zap.Stringer("EmitterChain", xfer.EmitterChain), zap.Stringer("EmitterChain", xfer.EmitterChain),
@ -172,7 +172,7 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer, now time.Time, start
} }
if xfer.EmitterAddress != ce.emitterAddr { if xfer.EmitterAddress != ce.emitterAddr {
gov.logger.Error("cgov: reloaded transfer for unsupported emitter address, dropping it", gov.logger.Error("reloaded transfer for unsupported emitter address, dropping it",
zap.Stringer("Timestamp", xfer.Timestamp), zap.Stringer("Timestamp", xfer.Timestamp),
zap.Uint64("Value", xfer.Value), zap.Uint64("Value", xfer.Value),
zap.Stringer("OriginChain", xfer.OriginChain), zap.Stringer("OriginChain", xfer.OriginChain),
@ -185,7 +185,7 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer, now time.Time, start
tk := tokenKey{chain: xfer.OriginChain, addr: xfer.OriginAddress} tk := tokenKey{chain: xfer.OriginChain, addr: xfer.OriginAddress}
_, exists = gov.tokens[tk] _, exists = gov.tokens[tk]
if !exists { if !exists {
gov.logger.Error("cgov: reloaded transfer for unsupported token, dropping it", gov.logger.Error("reloaded transfer for unsupported token, dropping it",
zap.Stringer("Timestamp", xfer.Timestamp), zap.Stringer("Timestamp", xfer.Timestamp),
zap.Uint64("Value", xfer.Value), zap.Uint64("Value", xfer.Value),
zap.Stringer("OriginChain", xfer.OriginChain), zap.Stringer("OriginChain", xfer.OriginChain),
@ -196,7 +196,7 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer, now time.Time, start
} }
if _, alreadyExists := gov.msgsSeen[xfer.Hash]; alreadyExists { if _, alreadyExists := gov.msgsSeen[xfer.Hash]; alreadyExists {
gov.logger.Info("cgov: not reloading transfer because it is a duplicate", gov.logger.Info("not reloading transfer because it is a duplicate",
zap.Stringer("Timestamp", xfer.Timestamp), zap.Stringer("Timestamp", xfer.Timestamp),
zap.Uint64("Value", xfer.Value), zap.Uint64("Value", xfer.Value),
zap.Stringer("OriginChain", xfer.OriginChain), zap.Stringer("OriginChain", xfer.OriginChain),
@ -208,7 +208,7 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer, now time.Time, start
} }
if xfer.Hash != "" { if xfer.Hash != "" {
gov.logger.Info("cgov: reloaded transfer", gov.logger.Info("reloaded transfer",
zap.Stringer("Timestamp", xfer.Timestamp), zap.Stringer("Timestamp", xfer.Timestamp),
zap.Uint64("Value", xfer.Value), zap.Uint64("Value", xfer.Value),
zap.Stringer("OriginChain", xfer.OriginChain), zap.Stringer("OriginChain", xfer.OriginChain),
@ -219,7 +219,7 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer, now time.Time, start
gov.msgsSeen[xfer.Hash] = transferComplete gov.msgsSeen[xfer.Hash] = transferComplete
} else { } else {
gov.logger.Error("cgov: reloaded transfer that does not have a hash, will not be able to detect a duplicate", gov.logger.Error("reloaded transfer that does not have a hash, will not be able to detect a duplicate",
zap.Stringer("Timestamp", xfer.Timestamp), zap.Stringer("Timestamp", xfer.Timestamp),
zap.Uint64("Value", xfer.Value), zap.Uint64("Value", xfer.Value),
zap.Stringer("OriginChain", xfer.OriginChain), zap.Stringer("OriginChain", xfer.OriginChain),

View File

@ -105,16 +105,14 @@ func (gov *ChainGovernor) Status() string {
for _, ce := range gov.chains { for _, ce := range gov.chains {
valueTrans := sumValue(ce.transfers, startTime) valueTrans := sumValue(ce.transfers, startTime)
s1 := fmt.Sprintf("chain: %v, dailyLimit: %v, total: %v, numPending: %v", ce.emitterChainId, ce.dailyLimit, valueTrans, len(ce.pending)) s1 := fmt.Sprintf("chain: %v, dailyLimit: %v, total: %v, numPending: %v", ce.emitterChainId, ce.dailyLimit, valueTrans, len(ce.pending))
s2 := fmt.Sprintf("cgov: %v", s1)
resp += s1 + "\n" resp += s1 + "\n"
gov.logger.Info(s2) gov.logger.Info(s1)
if len(ce.pending) != 0 { if len(ce.pending) != 0 {
for idx, pe := range ce.pending { for idx, pe := range ce.pending {
value, _ := computeValue(pe.amount, pe.token) value, _ := computeValue(pe.amount, pe.token)
s1 := fmt.Sprintf("chain: %v, pending[%v], value: %v, vaa: %v, timeStamp: %v, releaseTime: %v", ce.emitterChainId, idx, value, s1 := fmt.Sprintf("chain: %v, pending[%v], value: %v, vaa: %v, timeStamp: %v, releaseTime: %v", ce.emitterChainId, idx, value,
pe.dbData.Msg.MessageIDString(), pe.dbData.Msg.Timestamp.String(), pe.dbData.ReleaseTime.String()) pe.dbData.Msg.MessageIDString(), pe.dbData.Msg.Timestamp.String(), pe.dbData.ReleaseTime.String())
s2 := fmt.Sprintf("cgov: %v", s1) gov.logger.Info(s1)
gov.logger.Info(s2)
resp += " " + s1 + "\n" resp += " " + s1 + "\n"
} }
} }
@ -138,7 +136,7 @@ func (gov *ChainGovernor) Reload() (string, error) {
} }
if err := gov.loadFromDBAlreadyLocked(); err != nil { if err := gov.loadFromDBAlreadyLocked(); err != nil {
gov.logger.Error("cgov: failed to load from the database", zap.Error(err)) gov.logger.Error("failed to load from the database", zap.Error(err))
return "", err return "", err
} }
@ -155,7 +153,7 @@ func (gov *ChainGovernor) DropPendingVAA(vaaId string) (string, error) {
msgId := pe.dbData.Msg.MessageIDString() msgId := pe.dbData.Msg.MessageIDString()
if msgId == vaaId { if msgId == vaaId {
value, _ := computeValue(pe.amount, pe.token) value, _ := computeValue(pe.amount, pe.token)
gov.logger.Info("cgov: dropping pending vaa", gov.logger.Info("dropping pending vaa",
zap.String("msgId", msgId), zap.String("msgId", msgId),
zap.Uint64("value", value), zap.Uint64("value", value),
zap.Stringer("timeStamp", pe.dbData.Msg.Timestamp), zap.Stringer("timeStamp", pe.dbData.Msg.Timestamp),
@ -185,7 +183,7 @@ func (gov *ChainGovernor) ReleasePendingVAA(vaaId string) (string, error) {
msgId := pe.dbData.Msg.MessageIDString() msgId := pe.dbData.Msg.MessageIDString()
if msgId == vaaId { if msgId == vaaId {
value, _ := computeValue(pe.amount, pe.token) value, _ := computeValue(pe.amount, pe.token)
gov.logger.Info("cgov: releasing pending vaa, should be published soon", gov.logger.Info("releasing pending vaa, should be published soon",
zap.String("msgId", msgId), zap.String("msgId", msgId),
zap.Uint64("value", value), zap.Uint64("value", value),
zap.Stringer("timeStamp", pe.dbData.Msg.Timestamp), zap.Stringer("timeStamp", pe.dbData.Msg.Timestamp),
@ -224,14 +222,14 @@ func (gov *ChainGovernor) resetReleaseTimerForTime(vaaId string, now time.Time)
msgId := pe.dbData.Msg.MessageIDString() msgId := pe.dbData.Msg.MessageIDString()
if msgId == vaaId { if msgId == vaaId {
pe.dbData.ReleaseTime = now.Add(maxEnqueuedTime) pe.dbData.ReleaseTime = now.Add(maxEnqueuedTime)
gov.logger.Info("cgov: updating the release time due to admin command", gov.logger.Info("updating the release time due to admin command",
zap.String("msgId", msgId), zap.String("msgId", msgId),
zap.Stringer("timeStamp", pe.dbData.Msg.Timestamp), zap.Stringer("timeStamp", pe.dbData.Msg.Timestamp),
zap.Stringer("newReleaseTime", pe.dbData.ReleaseTime), zap.Stringer("newReleaseTime", pe.dbData.ReleaseTime),
) )
if err := gov.db.StorePendingMsg(&pe.dbData); err != nil { if err := gov.db.StorePendingMsg(&pe.dbData); err != nil {
gov.logger.Error("cgov: failed to store updated pending vaa", zap.String("msgID", msgId), zap.Error(err)) gov.logger.Error("failed to store updated pending vaa", zap.String("msgID", msgId), zap.Error(err))
return "", err return "", err
} }
@ -302,7 +300,7 @@ func (gov *ChainGovernor) GetEnqueuedVAAs() []*publicrpcv1.GovernorGetEnqueuedVA
for _, pe := range ce.pending { for _, pe := range ce.pending {
value, err := computeValue(pe.amount, pe.token) value, err := computeValue(pe.amount, pe.token)
if err != nil { if err != nil {
gov.logger.Error("cgov: failed to compute value of pending transfer", zap.String("msgID", pe.dbData.Msg.MessageIDString()), zap.Error(err)) gov.logger.Error("failed to compute value of pending transfer", zap.String("msgID", pe.dbData.Msg.MessageIDString()), zap.Error(err))
value = 0 value = 0
} }
@ -499,7 +497,7 @@ func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan<- []b
b, err := proto.Marshal(payload) b, err := proto.Marshal(payload)
if err != nil { if err != nil {
gov.logger.Error("cgov: failed to marshal config message", zap.Error(err)) gov.logger.Error("failed to marshal config message", zap.Error(err))
return return
} }
@ -540,7 +538,7 @@ func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan<- []b
for _, pe := range ce.pending { for _, pe := range ce.pending {
value, err := computeValue(pe.amount, pe.token) value, err := computeValue(pe.amount, pe.token)
if err != nil { if err != nil {
gov.logger.Error("cgov: failed to compute value of pending transfer", zap.String("msgID", pe.dbData.Msg.MessageIDString()), zap.Error(err)) gov.logger.Error("failed to compute value of pending transfer", zap.String("msgID", pe.dbData.Msg.MessageIDString()), zap.Error(err))
value = 0 value = 0
} }
@ -578,7 +576,7 @@ func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan<- []b
b, err := proto.Marshal(payload) b, err := proto.Marshal(payload)
if err != nil { if err != nil {
gov.logger.Error("cgov: failed to marshal status message", zap.Error(err)) gov.logger.Error("failed to marshal status message", zap.Error(err))
return return
} }

View File

@ -43,11 +43,11 @@ func (gov *ChainGovernor) initCoinGecko(ctx context.Context, run bool) error {
// Create the set of queries, breaking the IDs into the appropriate size chunks. // Create the set of queries, breaking the IDs into the appropriate size chunks.
gov.coinGeckoQueries = createCoinGeckoQueries(ids, tokensPerCoinGeckoQuery) gov.coinGeckoQueries = createCoinGeckoQueries(ids, tokensPerCoinGeckoQuery)
for queryIdx, query := range gov.coinGeckoQueries { for queryIdx, query := range gov.coinGeckoQueries {
gov.logger.Info("cgov: coingecko query: ", zap.Int("queryIdx", queryIdx), zap.String("query", query)) gov.logger.Info("coingecko query: ", zap.Int("queryIdx", queryIdx), zap.String("query", query))
} }
if len(gov.coinGeckoQueries) == 0 { if len(gov.coinGeckoQueries) == 0 {
gov.logger.Info("cgov: did not find any tokens, nothing to do!") gov.logger.Info("did not find any tokens, nothing to do!")
return nil return nil
} }
@ -130,7 +130,7 @@ func (gov *ChainGovernor) queryCoinGecko() error {
for queryIdx, query := range gov.coinGeckoQueries { for queryIdx, query := range gov.coinGeckoQueries {
thisResult, err := gov.queryCoinGeckoChunk(query) thisResult, err := gov.queryCoinGeckoChunk(query)
if err != nil { if err != nil {
gov.logger.Error("cgov: CoinGecko query failed", zap.Int("queryIdx", queryIdx), zap.String("query", query), zap.Error(err)) gov.logger.Error("CoinGecko query failed", zap.Int("queryIdx", queryIdx), zap.String("query", query), zap.Error(err))
gov.revertAllPrices() gov.revertAllPrices()
return err return err
} }
@ -161,7 +161,7 @@ func (gov *ChainGovernor) queryCoinGecko() error {
var ok bool var ok bool
price, ok = m["usd"].(float64) price, ok = m["usd"].(float64)
if !ok { if !ok {
gov.logger.Error("cgov: failed to parse CoinGecko response, reverting to configured price for this token", zap.String("coinGeckoId", coinGeckoId)) gov.logger.Error("failed to parse CoinGecko response, reverting to configured price for this token", zap.String("coinGeckoId", coinGeckoId))
// By continuing, we leave this one in the local map so the price will get reverted below. // By continuing, we leave this one in the local map so the price will get reverted below.
continue continue
} }
@ -175,14 +175,14 @@ func (gov *ChainGovernor) queryCoinGecko() error {
delete(localTokenMap, coinGeckoId) delete(localTokenMap, coinGeckoId)
} else { } else {
gov.logger.Error("cgov: received a CoinGecko response for an unexpected symbol", zap.String("coinGeckoId", coinGeckoId)) gov.logger.Error("received a CoinGecko response for an unexpected symbol", zap.String("coinGeckoId", coinGeckoId))
} }
} }
if len(localTokenMap) != 0 { if len(localTokenMap) != 0 {
for _, lcge := range localTokenMap { for _, lcge := range localTokenMap {
for _, te := range lcge { for _, te := range lcge {
gov.logger.Error("cgov: did not receive a CoinGecko response for symbol, reverting to configured price", gov.logger.Error("did not receive a CoinGecko response for symbol, reverting to configured price",
zap.String("symbol", te.symbol), zap.String("symbol", te.symbol),
zap.String("coinGeckoId", zap.String("coinGeckoId",
te.coinGeckoId), te.coinGeckoId),
@ -194,7 +194,7 @@ func (gov *ChainGovernor) queryCoinGecko() error {
} }
} }
return fmt.Errorf("cgov: failed to update prices for some tokens") return fmt.Errorf("failed to update prices for some tokens")
} }
return nil return nil
@ -204,7 +204,7 @@ func (gov *ChainGovernor) queryCoinGecko() error {
func (gov *ChainGovernor) queryCoinGeckoChunk(query string) (map[string]interface{}, error) { func (gov *ChainGovernor) queryCoinGeckoChunk(query string) (map[string]interface{}, error) {
var result map[string]interface{} var result map[string]interface{}
gov.logger.Debug("cgov: executing CoinGecko query", zap.String("query", query)) gov.logger.Debug("executing CoinGecko query", zap.String("query", query))
response, err := http.Get(query) //nolint:gosec response, err := http.Get(query) //nolint:gosec
if err != nil { if err != nil {
return result, fmt.Errorf("failed to query CoinGecko: %w", err) return result, fmt.Errorf("failed to query CoinGecko: %w", err)
@ -213,7 +213,7 @@ func (gov *ChainGovernor) queryCoinGeckoChunk(query string) (map[string]interfac
defer func() { defer func() {
err = response.Body.Close() err = response.Body.Close()
if err != nil { if err != nil {
gov.logger.Error("cgov: failed to close CoinGecko query: %w", zap.Error(err)) gov.logger.Error("failed to close CoinGecko query: %w", zap.Error(err))
} }
}() }()
@ -241,7 +241,7 @@ func (gov *ChainGovernor) revertAllPrices() {
for _, cge := range gov.tokensByCoinGeckoId { for _, cge := range gov.tokensByCoinGeckoId {
for _, te := range cge { for _, te := range cge {
gov.logger.Info("cgov: reverting to configured price", gov.logger.Info("reverting to configured price",
zap.String("symbol", te.symbol), zap.String("symbol", te.symbol),
zap.String("coinGeckoId", te.coinGeckoId), zap.String("coinGeckoId", te.coinGeckoId),
zap.Stringer("cfgPrice", te.cfgPrice), zap.Stringer("cfgPrice", te.cfgPrice),

View File

@ -7,7 +7,7 @@ import (
) )
func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []chainConfigEntry) { func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []chainConfigEntry) {
gov.logger.Info("cgov: setting up testnet config") gov.logger.Info("setting up testnet config")
tokens := []tokenConfigEntry{ tokens := []tokenConfigEntry{
tokenConfigEntry{chain: 1, addr: "069b8857feab8184fb687f634618c035dac439dc1aeb3b5598a0f00000000001", symbol: "SOL", coinGeckoId: "wrapped-solana", decimals: 8, price: 34.94}, // Addr: So11111111111111111111111111111111111111112, Notional: 4145006 tokenConfigEntry{chain: 1, addr: "069b8857feab8184fb687f634618c035dac439dc1aeb3b5598a0f00000000001", symbol: "SOL", coinGeckoId: "wrapped-solana", decimals: 8, price: 34.94}, // Addr: So11111111111111111111111111111111111111112, Notional: 4145006

View File

@ -547,12 +547,10 @@ func Run(
} }
} }
case *gossipv1.GossipMessage_SignedChainGovernorConfig: case *gossipv1.GossipMessage_SignedChainGovernorConfig:
logger.Debug("cgov: received config message")
if signedGovCfg != nil { if signedGovCfg != nil {
signedGovCfg <- m.SignedChainGovernorConfig signedGovCfg <- m.SignedChainGovernorConfig
} }
case *gossipv1.GossipMessage_SignedChainGovernorStatus: case *gossipv1.GossipMessage_SignedChainGovernorStatus:
logger.Debug("cgov: received status message")
if signedGovSt != nil { if signedGovSt != nil {
signedGovSt <- m.SignedChainGovernorStatus signedGovSt <- m.SignedChainGovernorStatus
} }

View File

@ -231,9 +231,9 @@ func (p *Processor) Run(ctx context.Context) error {
for _, k := range toBePublished { for _, k := range toBePublished {
// SECURITY defense-in-depth: Make sure the governor did not generate an unexpected message. // SECURITY defense-in-depth: Make sure the governor did not generate an unexpected message.
if msgIsGoverned, err := p.governor.IsGovernedMsg(k); err != nil { if msgIsGoverned, err := p.governor.IsGovernedMsg(k); err != nil {
return fmt.Errorf("cgov: governor failed to determine if message should be governed: `%s`: %w", k.MessageIDString(), err) return fmt.Errorf("governor failed to determine if message should be governed: `%s`: %w", k.MessageIDString(), err)
} else if !msgIsGoverned { } else if !msgIsGoverned {
return fmt.Errorf("cgov: governor published a message that should not be governed: `%s`", k.MessageIDString()) return fmt.Errorf("governor published a message that should not be governed: `%s`", k.MessageIDString())
} }
if p.acct != nil { if p.acct != nil {
shouldPub, err := p.acct.SubmitObservation(k) shouldPub, err := p.acct.SubmitObservation(k)