diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 06bbc8e5f..9eee0bd48 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -459,22 +459,13 @@ func runNode(cmd *cobra.Command, args []string) { // In devnet mode, we automatically set a number of flags that rely on deterministic keys. if *unsafeDevMode { - // When running multiple guardians in tilt, we only see p2p traffic from others if they are also bootstrap hosts. - p2pStr := "" - for idx := 0; idx < int(*devNumGuardians); idx++ { - key, err := peer.IDFromPrivateKey(devnet.DeterministicP2PPrivKeyByIndex(int64(idx))) - if err != nil { - panic(err) - } - - if p2pStr != "" { - p2pStr = p2pStr + "," - } - p2pStr = p2pStr + fmt.Sprintf("/dns4/guardian-%d.guardian/udp/%d/quic/p2p/%s", idx, *p2pPort, key.String()) + g0key, err := peer.IDFromPrivateKey(devnet.DeterministicP2PPrivKeyByIndex(0)) + if err != nil { + panic(err) } - *p2pBootstrap = p2pStr - logger.Info("running in dev mode", zap.Uint("devNumGuardians", *devNumGuardians), zap.String("p2pBootstrap", *p2pBootstrap)) + // Use the first guardian node as bootstrap + *p2pBootstrap = fmt.Sprintf("/dns4/guardian-0.guardian/udp/%d/quic/p2p/%s", *p2pPort, g0key.String()) // Deterministic ganache ETH devnet address. *ethContract = devnet.GanacheWormholeContractAddress.Hex() diff --git a/node/pkg/db/governor.go b/node/pkg/db/governor.go index 5bafcc7f8..61a3f92c6 100644 --- a/node/pkg/db/governor.go +++ b/node/pkg/db/governor.go @@ -264,10 +264,8 @@ func (d *Database) GetChainGovernorDataForTime(logger *zap.Logger, now time.Time } key := oldPendingMsgID(&pending.Msg) - if err := d.db.Update(func(txn *badger.Txn) error { - err := txn.Delete(key) - return err - }); err != nil { + err = d.db.DropPrefix(key) + if err != nil { return fmt.Errorf("failed to delete old pending msg for key [%v]: %w", pending.Msg.MessageIDString(), err) } } @@ -317,11 +315,8 @@ func (d *Database) StorePendingMsg(pending *PendingTransfer) error { // This is called by the chain governor to delete a transfer after the time limit has expired. func (d *Database) DeleteTransfer(t *Transfer) error { key := TransferMsgID(t) - - if err := d.db.Update(func(txn *badger.Txn) error { - err := txn.Delete(key) - return err - }); err != nil { + err := d.db.DropPrefix(key) + if err != nil { return fmt.Errorf("failed to delete transfer msg for key [%v]: %w", key, err) } @@ -331,10 +326,8 @@ func (d *Database) DeleteTransfer(t *Transfer) error { // This is called by the chain governor to delete a pending transfer. func (d *Database) DeletePendingMsg(pending *PendingTransfer) error { key := PendingMsgID(&pending.Msg) - if err := d.db.Update(func(txn *badger.Txn) error { - err := txn.Delete(key) - return err - }); err != nil { + err := d.db.DropPrefix(key) + if err != nil { return fmt.Errorf("failed to delete pending msg for key [%v]: %w", key, err) } diff --git a/node/pkg/db/governor_test.go b/node/pkg/db/governor_test.go index d9551041f..70f439f1d 100644 --- a/node/pkg/db/governor_test.go +++ b/node/pkg/db/governor_test.go @@ -15,13 +15,6 @@ import ( "go.uber.org/zap" ) -func (d *Database) rowExistsInDB(key []byte) error { - return d.db.View(func(txn *badger.Txn) error { - _, err := txn.Get(key) - return err - }) -} - func TestSerializeAndDeserializeOfTransfer(t *testing.T) { tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") require.NoError(t, err) @@ -188,14 +181,8 @@ func TestDeleteTransfer(t *testing.T) { err2 := db.StoreTransfer(xfer1) require.NoError(t, err2) - // Make sure the xfer exists in the db. - assert.NoError(t, db.rowExistsInDB(TransferMsgID(xfer1))) - err3 := db.DeleteTransfer(xfer1) require.NoError(t, err3) - - // Make sure the xfer is no longer in the db. - assert.ErrorIs(t, badger.ErrKeyNotFound, db.rowExistsInDB(TransferMsgID(xfer1))) } func TestStorePendingMsg(t *testing.T) { @@ -253,14 +240,8 @@ func TestDeletePendingMsg(t *testing.T) { err3 := db.StorePendingMsg(pending) require.NoError(t, err3) - // Make sure the pending transfer exists in the db. - assert.NoError(t, db.rowExistsInDB(PendingMsgID(msg))) - err4 := db.DeletePendingMsg(pending) assert.Nil(t, err4) - - // Make sure the pending transfer is no longer in the db. - assert.ErrorIs(t, badger.ErrKeyNotFound, db.rowExistsInDB(PendingMsgID(msg))) } func TestSerializeAndDeserializeOfPendingTransfer(t *testing.T) { diff --git a/node/pkg/governor/governor.go b/node/pkg/governor/governor.go index 7ceb5c400..5f9e26b60 100644 --- a/node/pkg/governor/governor.go +++ b/node/pkg/governor/governor.go @@ -46,9 +46,6 @@ const ( TestNetMode = 2 DevNetMode = 3 GoTestMode = 4 - - transferComplete = true - transferEnqueued = false ) // WARNING: Change me in ./node/db as well @@ -121,7 +118,6 @@ type ChainGovernor struct { tokens map[tokenKey]*tokenEntry tokensByCoinGeckoId map[string][]*tokenEntry chains map[vaa.ChainID]*chainEntry - msgsById map[string]bool // Use consts transferComplete and transferEnqueued. msgsToPublish []*common.MessagePublication dayLengthInMinutes int coinGeckoQuery string @@ -143,7 +139,6 @@ func NewChainGovernor( tokens: make(map[tokenKey]*tokenEntry), tokensByCoinGeckoId: make(map[string][]*tokenEntry), chains: make(map[vaa.ChainID]*chainEntry), - msgsById: make(map[string]bool), env: env, } } @@ -292,8 +287,6 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now return false, fmt.Errorf("msg is nil") } - msgId := msg.MessageIDString() - gov.mutex.Lock() defer gov.mutex.Unlock() @@ -301,25 +294,25 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now // If we don't care about this chain, the VAA can be published. if !exists { - gov.logger.Info("cgov: ignoring vaa because the emitter chain is not configured", zap.String("msgID", msgId)) + gov.logger.Info("cgov: ignoring vaa because the emitter chain is not configured", zap.String("msgID", msg.MessageIDString())) return true, nil } // If we don't care about this emitter, the VAA can be published. if msg.EmitterAddress != ce.emitterAddr { - gov.logger.Info("cgov: ignoring vaa because the emitter address is not configured", zap.String("msgID", msgId)) + gov.logger.Info("cgov: ignoring vaa because the emitter address is not configured", zap.String("msgID", msg.MessageIDString())) return true, nil } // We only care about transfers. if !vaa.IsTransfer(msg.Payload) { - gov.logger.Info("cgov: ignoring vaa because it is not a transfer", zap.String("msgID", msgId)) + gov.logger.Info("cgov: ignoring vaa because it is not a transfer", zap.String("msgID", msg.MessageIDString())) return true, nil } payload, err := vaa.DecodeTransferPayloadHdr(msg.Payload) if err != nil { - gov.logger.Error("cgov: failed to decode vaa", zap.String("msgID", msgId), zap.Error(err)) + gov.logger.Error("cgov: failed to decode vaa", zap.String("msgID", msg.MessageIDString()), zap.Error(err)) return true, err } @@ -327,33 +320,26 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now tk := tokenKey{chain: payload.OriginChain, addr: payload.OriginAddress} token, exists := gov.tokens[tk] if !exists { - gov.logger.Info("cgov: ignoring vaa because the token is not in the list", zap.String("msgID", msgId)) + gov.logger.Info("cgov: ignoring vaa because the token is not in the list", zap.String("msgID", msg.MessageIDString())) return true, nil } - // If we've already seen this message via quorum, we can publish it. - xferComplete, exists := gov.msgsById[msgId] - if exists { - gov.logger.Info("cgov: ignoring vaa because it has already been seen", zap.String("msgID", msgId)) - return xferComplete, nil - } - startTime := now.Add(-time.Minute * time.Duration(gov.dayLengthInMinutes)) - prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime) + prevTotalValue, err := ce.TrimAndSumValue(startTime, gov.db) if err != nil { - gov.logger.Error("cgov: failed to trim transfers", zap.String("msgID", msgId), zap.Error(err)) + gov.logger.Error("cgov: failed to trim transfers", zap.String("msgID", msg.MessageIDString()), zap.Error(err)) return false, err } value, err := computeValue(payload.Amount, token) if err != nil { - gov.logger.Error("cgov: failed to compute value of transfer", zap.String("msgID", msgId), zap.Error(err)) + gov.logger.Error("cgov: failed to compute value of transfer", zap.String("msgID", msg.MessageIDString()), zap.Error(err)) return false, err } newTotalValue := prevTotalValue + value if newTotalValue < prevTotalValue { - gov.logger.Error("cgov: total value has overflowed", zap.String("msgID", msgId), zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("newTotalValue", newTotalValue)) + gov.logger.Error("cgov: total value has overflowed", zap.String("msgID", msg.MessageIDString()), zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("newTotalValue", newTotalValue)) return false, fmt.Errorf("total value has overflowed") } @@ -366,7 +352,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now zap.Uint64("value", value), zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("newTotalValue", newTotalValue), - zap.String("msgID", msgId), + zap.String("msgID", msg.MessageIDString()), zap.Stringer("releaseTime", releaseTime), zap.Uint64("bigTransactionSize", ce.bigTransactionSize), ) @@ -378,17 +364,16 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("newTotalValue", newTotalValue), zap.Stringer("releaseTime", releaseTime), - zap.String("msgID", msgId), + zap.String("msgID", msg.MessageIDString()), ) } if enqueueIt { dbData := db.PendingTransfer{ReleaseTime: releaseTime, Msg: *msg} ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, dbData: dbData}) - gov.msgsById[msgId] = transferEnqueued err = gov.db.StorePendingMsg(&dbData) if err != nil { - gov.logger.Error("cgov: failed to store pending vaa", zap.String("msgID", msgId), zap.Error(err)) + gov.logger.Error("cgov: failed to store pending vaa", zap.String("msgID", msg.MessageIDString()), zap.Error(err)) return false, err } @@ -399,143 +384,19 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now zap.Uint64("value", value), zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("newTotalValue", newTotalValue), - zap.String("msgID", msgId)) + zap.String("msgID", msg.MessageIDString())) - xfer := &db.Transfer{Timestamp: now, Value: value, OriginChain: token.token.chain, OriginAddress: token.token.addr, EmitterChain: msg.EmitterChain, EmitterAddress: msg.EmitterAddress, MsgID: msgId} - ce.transfers = append(ce.transfers, xfer) - gov.msgsById[msgId] = transferComplete - err = gov.db.StoreTransfer(xfer) + xfer := db.Transfer{Timestamp: now, Value: value, OriginChain: token.token.chain, OriginAddress: token.token.addr, EmitterChain: msg.EmitterChain, EmitterAddress: msg.EmitterAddress, MsgID: msg.MessageIDString()} + ce.transfers = append(ce.transfers, &xfer) + err = gov.db.StoreTransfer(&xfer) if err != nil { - gov.logger.Error("cgov: failed to store transfer", zap.String("msgID", msgId), zap.Error(err)) + gov.logger.Error("cgov: failed to store transfer", zap.String("msgID", msg.MessageIDString()), zap.Error(err)) return false, err } return true, nil } -// Note: This function only gets called once for each VAA. If the processor finds the VAA in the DB, this function will not be called. -func (gov *ChainGovernor) ProcessInboundQuorum(v *vaa.VAA) { - if v == nil { - gov.logger.Error("cgov: received inbound quorum event with nil vaa") - return - } - - msgId := v.MessageID() - - gov.mutex.Lock() - defer gov.mutex.Unlock() - - ce, exists := gov.chains[v.EmitterChain] - - // If we don't care about this chain, we can ignore this VAA. - if !exists { - gov.logger.Info("cgov: ignoring incoming quorum vaa because the emitter chain is not configured", zap.String("msgID", msgId)) - return - } - - // If we don't care about this emitter, we can ignore this VAA. - if v.EmitterAddress != ce.emitterAddr { - gov.logger.Info("cgov: ignoring incoming quorum vaa because the emitter address is not configured", zap.String("msgID", msgId)) - return - } - - // We only care about transfers. - if !vaa.IsTransfer(v.Payload) { - gov.logger.Info("cgov: ignoring incoming quorum vaa because it is not a transfer", zap.String("msgID", msgId)) - return - } - - payload, err := vaa.DecodeTransferPayloadHdr(v.Payload) - if err != nil { - gov.logger.Error("cgov: failed to decode incoming quorum vaa", zap.String("msgID", msgId), zap.Error(err)) - return - } - - // If we don't care about this token, we can ignore this VAA. - tk := tokenKey{chain: payload.OriginChain, addr: payload.OriginAddress} - token, exists := gov.tokens[tk] - if !exists { - gov.logger.Info("cgov: ignoring incoming quorum vaa because the token is not in the list", zap.String("msgID", msgId)) - return - } - - // See if we have already processed this VAA. - xferComplete, exists := gov.msgsById[msgId] - if exists { - if xferComplete { - gov.logger.Info("cgov: ignoring incoming quorum vaa because we've already seen it", zap.String("msgID", msgId)) - return - } - - // If we get here, the VAA is enqueued. - foundIt := false - for _, ce := range gov.chains { - for idx, pe := range ce.pending { - if msgId == pe.dbData.Msg.MessageIDString() { - gov.logger.Info("cgov: received incoming quorum for transfer in the enqueued list, removing it from the pending list", zap.String("msgID", msgId)) - if err := gov.db.DeletePendingMsg(&pe.dbData); err != nil { - gov.logger.Error("cgov: failed to delete pending entry", zap.String("msgId", msgId), zap.Error(err)) - // Continue on so that our data structures are accurate. - } - - ce.pending = append(ce.pending[:idx], ce.pending[idx+1:]...) - delete(gov.msgsById, msgId) - foundIt = true - break - } - } - - if foundIt { - break - } - } - - if !foundIt { - gov.logger.Error("cgov: failed to find pending entry for incoming quorum vaa, adding it to the list of transfers", zap.String("msgId", msgId)) - } - } - - value, err := computeValue(payload.Amount, token) - if err != nil { - gov.logger.Error("cgov: failed to compute value of incoming quorum vaa", zap.String("msgID", msgId), zap.Error(err)) - return - } - - now := time.Now() - startTime := now.Add(-time.Minute * time.Duration(gov.dayLengthInMinutes)) - prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime) - if err != nil { - gov.logger.Error("cgov: failed to trim transfers for incoming quorum vaa", zap.String("msgID", msgId), zap.Error(err)) - return - } - - newTotalValue := prevTotalValue + value - - if newTotalValue > ce.dailyLimit { - gov.logger.Error("cgov: incoming quorum vaa put us over the daily limit", - zap.Uint64("value", value), - zap.Uint64("prevTotalValue", prevTotalValue), - zap.Uint64("newTotalValue", newTotalValue), - zap.Uint64("dailyLimit", ce.dailyLimit), - zap.String("msgID", msgId)) - } else { - gov.logger.Info("cgov: adding incoming quorum vaa", - zap.Uint64("value", value), - zap.Uint64("prevTotalValue", prevTotalValue), - zap.Uint64("newTotalValue", newTotalValue), - zap.String("msgID", msgId)) - } - - xfer := &db.Transfer{Timestamp: now, Value: value, OriginChain: token.token.chain, OriginAddress: token.token.addr, EmitterChain: v.EmitterChain, EmitterAddress: v.EmitterAddress, MsgID: msgId} - ce.transfers = append(ce.transfers, xfer) - gov.msgsById[msgId] = transferComplete - err = gov.db.StoreTransfer(xfer) - if err != nil { - gov.logger.Error("cgov: failed to store transfer for incoming quorum vaa", zap.String("msgID", msgId), zap.Error(err)) - return - } -} - func (gov *ChainGovernor) CheckPending() ([]*common.MessagePublication, error) { return gov.CheckPendingForTime(time.Now()) } @@ -558,7 +419,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP // Keep going as long as we find something that will fit. for { foundOne := false - prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime) + prevTotalValue, err := ce.TrimAndSumValue(startTime, gov.db) if err != nil { gov.logger.Error("cgov: failed to trim transfers", zap.Error(err)) gov.msgsToPublish = msgsToPublish @@ -567,13 +428,12 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP // Keep going until we find something that fits or hit the end. for idx, pe := range ce.pending { - msgId := pe.dbData.Msg.MessageIDString() value, err := computeValue(pe.amount, pe.token) if err != nil { gov.logger.Error("cgov: failed to compute value for pending vaa", zap.Stringer("amount", pe.amount), zap.Stringer("price", pe.token.price), - zap.String("msgID", msgId), + zap.String("msgID", pe.dbData.Msg.MessageIDString()), zap.Error(err), ) @@ -593,7 +453,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP zap.Stringer("price", pe.token.price), zap.Uint64("value", value), zap.Stringer("releaseTime", pe.dbData.ReleaseTime), - zap.String("msgID", msgId)) + zap.String("msgID", pe.dbData.Msg.MessageIDString())) } else if now.After(pe.dbData.ReleaseTime) { countsTowardsTransfers = false gov.logger.Info("cgov: posting pending vaa because the release time has been reached", @@ -601,7 +461,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP zap.Stringer("price", pe.token.price), zap.Uint64("value", value), zap.Stringer("releaseTime", pe.dbData.ReleaseTime), - zap.String("msgID", msgId)) + zap.String("msgID", pe.dbData.Msg.MessageIDString())) } else { newTotalValue := prevTotalValue + value if newTotalValue < prevTotalValue { @@ -620,24 +480,21 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP zap.Uint64("value", value), zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("newTotalValue", newTotalValue), - zap.String("msgID", msgId)) + zap.String("msgID", pe.dbData.Msg.MessageIDString())) } // If we get here, publish it and remove it from the pending list. msgsToPublish = append(msgsToPublish, &pe.dbData.Msg) if countsTowardsTransfers { - gov.msgsById[msgId] = transferComplete - xfer := &db.Transfer{Timestamp: now, Value: value, OriginChain: pe.token.token.chain, OriginAddress: pe.token.token.addr, - EmitterChain: pe.dbData.Msg.EmitterChain, EmitterAddress: pe.dbData.Msg.EmitterAddress, MsgID: msgId} - ce.transfers = append(ce.transfers, xfer) + xfer := db.Transfer{Timestamp: now, Value: value, OriginChain: pe.token.token.chain, OriginAddress: pe.token.token.addr, + EmitterChain: pe.dbData.Msg.EmitterChain, EmitterAddress: pe.dbData.Msg.EmitterAddress, MsgID: pe.dbData.Msg.MessageIDString()} + ce.transfers = append(ce.transfers, &xfer) - if err := gov.db.StoreTransfer(xfer); err != nil { + if err := gov.db.StoreTransfer(&xfer); err != nil { gov.msgsToPublish = msgsToPublish return nil, err } - } else { - delete(gov.msgsById, msgId) } if err := gov.db.DeletePendingMsg(&pe.dbData); err != nil { @@ -678,12 +535,12 @@ func computeValue(amount *big.Int, token *tokenEntry) (uint64, error) { return value, nil } -func (gov *ChainGovernor) TrimAndSumValueForChain(ce *chainEntry, startTime time.Time) (sum uint64, err error) { - sum, ce.transfers, err = gov.TrimAndSumValue(ce.transfers, startTime) +func (ce *chainEntry) TrimAndSumValue(startTime time.Time, db db.GovernorDB) (sum uint64, err error) { + sum, ce.transfers, err = TrimAndSumValue(ce.transfers, startTime, db) return sum, err } -func (gov *ChainGovernor) TrimAndSumValue(transfers []*db.Transfer, startTime time.Time) (uint64, []*db.Transfer, error) { +func TrimAndSumValue(transfers []*db.Transfer, startTime time.Time, db db.GovernorDB) (uint64, []*db.Transfer, error) { if len(transfers) == 0 { return 0, transfers, nil } @@ -700,10 +557,11 @@ func (gov *ChainGovernor) TrimAndSumValue(transfers []*db.Transfer, startTime ti } if trimIdx >= 0 { - for idx := 0; idx <= trimIdx; idx++ { - delete(gov.msgsById, transfers[idx].MsgID) - if err := gov.db.DeleteTransfer(transfers[idx]); err != nil { - return 0, transfers, err + if db != nil { + for idx := 0; idx <= trimIdx; idx++ { + if err := db.DeleteTransfer(transfers[idx]); err != nil { + return 0, transfers, err + } } } diff --git a/node/pkg/governor/governor_db.go b/node/pkg/governor/governor_db.go index fefef8f6e..0822d7b43 100644 --- a/node/pkg/governor/governor_db.go +++ b/node/pkg/governor/governor_db.go @@ -58,11 +58,10 @@ func (gov *ChainGovernor) loadFromDBAlreadyLocked() error { func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now time.Time) { msg := &pending.Msg - msgId := msg.MessageIDString() ce, exists := gov.chains[msg.EmitterChain] if !exists { gov.logger.Error("cgov: reloaded pending transfer for unsupported chain, dropping it", - zap.String("MsgID", msgId), + zap.String("MsgID", msg.MessageIDString()), zap.Stringer("TxHash", msg.TxHash), zap.Stringer("Timestamp", msg.Timestamp), zap.Uint32("Nonce", msg.Nonce), @@ -76,7 +75,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now if msg.EmitterAddress != ce.emitterAddr { gov.logger.Error("cgov: reloaded pending transfer for unsupported emitter address, dropping it", - zap.String("MsgID", msgId), + zap.String("MsgID", msg.MessageIDString()), zap.Stringer("TxHash", msg.TxHash), zap.Stringer("Timestamp", msg.Timestamp), zap.Uint32("Nonce", msg.Nonce), @@ -91,7 +90,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now payload, err := vaa.DecodeTransferPayloadHdr(msg.Payload) if err != nil { gov.logger.Error("cgov: failed to parse payload for reloaded pending transfer, dropping it", - zap.String("MsgID", msgId), + zap.String("MsgID", msg.MessageIDString()), zap.Stringer("TxHash", msg.TxHash), zap.Stringer("Timestamp", msg.Timestamp), zap.Uint32("Nonce", msg.Nonce), @@ -110,7 +109,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now token, exists := gov.tokens[tk] if !exists { gov.logger.Error("cgov: reloaded pending transfer for unsupported token, dropping it", - zap.String("MsgID", msgId), + zap.String("MsgID", msg.MessageIDString()), zap.Stringer("TxHash", msg.TxHash), zap.Stringer("Timestamp", msg.Timestamp), zap.Uint32("Nonce", msg.Nonce), @@ -124,34 +123,19 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now return } - if _, exists := gov.msgsById[msgId]; !exists { - gov.logger.Info("cgov: reloaded pending transfer", - zap.String("MsgID", msgId), - zap.Stringer("TxHash", msg.TxHash), - zap.Stringer("Timestamp", msg.Timestamp), - zap.Uint32("Nonce", msg.Nonce), - zap.Uint64("Sequence", msg.Sequence), - zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel), - zap.Stringer("EmitterChain", msg.EmitterChain), - zap.Stringer("EmitterAddress", msg.EmitterAddress), - zap.Stringer("Amount", payload.Amount), - ) + gov.logger.Info("cgov: reloaded pending transfer", + zap.String("MsgID", msg.MessageIDString()), + zap.Stringer("TxHash", msg.TxHash), + zap.Stringer("Timestamp", msg.Timestamp), + zap.Uint32("Nonce", msg.Nonce), + zap.Uint64("Sequence", msg.Sequence), + zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel), + zap.Stringer("EmitterChain", msg.EmitterChain), + zap.Stringer("EmitterAddress", msg.EmitterAddress), + zap.Stringer("Amount", payload.Amount), + ) - ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, dbData: *pending}) - gov.msgsById[msgId] = transferEnqueued - } else { - gov.logger.Info("cgov: dropping duplicate pending transfer", - zap.String("MsgID", msgId), - zap.Stringer("TxHash", msg.TxHash), - zap.Stringer("Timestamp", msg.Timestamp), - zap.Uint32("Nonce", msg.Nonce), - zap.Uint64("Sequence", msg.Sequence), - zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel), - zap.Stringer("EmitterChain", msg.EmitterChain), - zap.Stringer("EmitterAddress", msg.EmitterAddress), - zap.Stringer("Amount", payload.Amount), - ) - } + ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, dbData: *pending}) } func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer, now time.Time, startTime time.Time) { @@ -191,24 +175,13 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer, now time.Time, start return } - if _, exists := gov.msgsById[xfer.MsgID]; !exists { - gov.logger.Info("cgov: reloaded transfer", - zap.Stringer("Timestamp", xfer.Timestamp), - zap.Uint64("Value", xfer.Value), - zap.Stringer("OriginChain", xfer.OriginChain), - zap.Stringer("OriginAddress", xfer.OriginAddress), - zap.String("MsgID", xfer.MsgID), - ) + gov.logger.Info("cgov: reloaded transfer", + zap.Stringer("Timestamp", xfer.Timestamp), + zap.Uint64("Value", xfer.Value), + zap.Stringer("OriginChain", xfer.OriginChain), + zap.Stringer("OriginAddress", xfer.OriginAddress), + zap.String("MsgID", xfer.MsgID), + ) - ce.transfers = append(ce.transfers, xfer) - gov.msgsById[xfer.MsgID] = transferComplete - } else { - gov.logger.Info("cgov: dropping duplicate transfer", - zap.Stringer("Timestamp", xfer.Timestamp), - zap.Uint64("Value", xfer.Value), - zap.Stringer("OriginChain", xfer.OriginChain), - zap.Stringer("OriginAddress", xfer.OriginAddress), - zap.String("MsgID", xfer.MsgID), - ) - } + ce.transfers = append(ce.transfers, xfer) } diff --git a/node/pkg/governor/governor_test.go b/node/pkg/governor/governor_test.go index bcd90df52..7ff6a2b30 100644 --- a/node/pkg/governor/governor_test.go +++ b/node/pkg/governor/governor_test.go @@ -111,27 +111,17 @@ func (gov *ChainGovernor) getStatsForAllChains() (numTrans int, valueTrans uint6 } func TestTrimEmptyTransfers(t *testing.T) { - ctx := context.Background() - gov, err := newChainGovernorForTest(ctx) - require.NoError(t, err) - assert.NotNil(t, gov) - now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)") require.NoError(t, err) var transfers []*db.Transfer - sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now) + sum, updatedTransfers, err := TrimAndSumValue(transfers, now, nil) require.NoError(t, err) assert.Equal(t, uint64(0), sum) assert.Equal(t, 0, len(updatedTransfers)) } func TestSumAllFromToday(t *testing.T) { - ctx := context.Background() - gov, err := newChainGovernorForTest(ctx) - require.NoError(t, err) - assert.NotNil(t, gov) - now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)") require.NoError(t, err) @@ -139,18 +129,13 @@ func TestSumAllFromToday(t *testing.T) { transferTime, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 11:00am (CST)") require.NoError(t, err) transfers = append(transfers, &db.Transfer{Value: 125000, Timestamp: transferTime}) - sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now.Add(-time.Hour*24)) + sum, updatedTransfers, err := TrimAndSumValue(transfers, now.Add(-time.Hour*24), nil) require.NoError(t, err) assert.Equal(t, uint64(125000), sum) assert.Equal(t, 1, len(updatedTransfers)) } func TestTrimOneOfTwoTransfers(t *testing.T) { - ctx := context.Background() - gov, err := newChainGovernorForTest(ctx) - require.NoError(t, err) - assert.NotNil(t, gov) - now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)") require.NoError(t, err) @@ -167,18 +152,13 @@ func TestTrimOneOfTwoTransfers(t *testing.T) { transfers = append(transfers, &db.Transfer{Value: 225000, Timestamp: transferTime2}) assert.Equal(t, 2, len(transfers)) - sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now.Add(-time.Hour*24)) + sum, updatedTransfers, err := TrimAndSumValue(transfers, now.Add(-time.Hour*24), nil) require.NoError(t, err) assert.Equal(t, 1, len(updatedTransfers)) assert.Equal(t, uint64(225000), sum) } func TestTrimSeveralTransfers(t *testing.T) { - ctx := context.Background() - gov, err := newChainGovernorForTest(ctx) - require.NoError(t, err) - assert.NotNil(t, gov) - now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)") require.NoError(t, err) @@ -208,18 +188,13 @@ func TestTrimSeveralTransfers(t *testing.T) { assert.Equal(t, 5, len(transfers)) - sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now.Add(-time.Hour*24)) + sum, updatedTransfers, err := TrimAndSumValue(transfers, now.Add(-time.Hour*24), nil) require.NoError(t, err) assert.Equal(t, 3, len(updatedTransfers)) assert.Equal(t, uint64(465000), sum) } func TestTrimmingAllTransfersShouldReturnZero(t *testing.T) { - ctx := context.Background() - gov, err := newChainGovernorForTest(ctx) - require.NoError(t, err) - assert.NotNil(t, gov) - now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)") require.NoError(t, err) @@ -234,7 +209,7 @@ func TestTrimmingAllTransfersShouldReturnZero(t *testing.T) { transfers = append(transfers, &db.Transfer{Value: 225000, Timestamp: transferTime2}) assert.Equal(t, 2, len(transfers)) - sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now) + sum, updatedTransfers, err := TrimAndSumValue(transfers, now, nil) require.NoError(t, err) assert.Equal(t, 0, len(updatedTransfers)) assert.Equal(t, uint64(0), sum) @@ -317,7 +292,6 @@ func TestVaaForUninterestingEmitterChain(t *testing.T) { assert.Equal(t, uint64(0), valueTrans) assert.Equal(t, 0, numPending) assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 0, len(gov.msgsById)) } func TestVaaForUninterestingEmitterAddress(t *testing.T) { @@ -350,7 +324,6 @@ func TestVaaForUninterestingEmitterAddress(t *testing.T) { assert.Equal(t, uint64(0), valueTrans) assert.Equal(t, 0, numPending) assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 0, len(gov.msgsById)) } func TestVaaForUninterestingPayloadType(t *testing.T) { @@ -383,7 +356,6 @@ func TestVaaForUninterestingPayloadType(t *testing.T) { assert.Equal(t, uint64(0), valueTrans) assert.Equal(t, 0, numPending) assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 0, len(gov.msgsById)) } // Note this method assumes 18 decimals for the amount. @@ -487,7 +459,6 @@ func TestVaaForUninterestingToken(t *testing.T) { assert.Equal(t, uint64(0), valueTrans) assert.Equal(t, 0, numPending) assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 0, len(gov.msgsById)) } func TestTransfersUpToAndOverTheLimit(t *testing.T) { @@ -518,7 +489,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) { ) // The first two transfers should be accepted. - msg1 := common.MessagePublication{ + msg := common.MessagePublication{ TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), @@ -529,7 +500,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) { Payload: payloadBytes1, } - canPost, err := gov.ProcessMsgForTime(&msg1, time.Now()) + canPost, err := gov.ProcessMsgForTime(&msg, time.Now()) require.NoError(t, err) numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains() @@ -538,20 +509,8 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) { assert.Equal(t, uint64(2218), valueTrans) assert.Equal(t, 0, numPending) assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 1, len(gov.msgsById)) - msg2 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), - Timestamp: time.Unix(int64(1654543099), 0), - Nonce: uint32(1), - Sequence: uint64(2), - EmitterChain: vaa.ChainIDEthereum, - EmitterAddress: tokenBridgeAddr, - ConsistencyLevel: uint8(32), - Payload: payloadBytes1, - } - - canPost, err = gov.ProcessMsgForTime(&msg2, time.Now()) + canPost, err = gov.ProcessMsgForTime(&msg, time.Now()) require.NoError(t, err) numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains() @@ -560,7 +519,6 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) { assert.Equal(t, uint64(4436), valueTrans) assert.Equal(t, 0, numPending) assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 2, len(gov.msgsById)) // But the third one should be queued up. payloadBytes2 := buildMockTransferPayloadBytes(1, @@ -571,18 +529,9 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) { 1250, ) - msg3 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), - Timestamp: time.Unix(int64(1654543099), 0), - Nonce: uint32(1), - Sequence: uint64(3), - EmitterChain: vaa.ChainIDEthereum, - EmitterAddress: tokenBridgeAddr, - ConsistencyLevel: uint8(32), - Payload: payloadBytes2, - } + msg.Payload = payloadBytes2 - canPost, err = gov.ProcessMsgForTime(&msg3, time.Now()) + canPost, err = gov.ProcessMsgForTime(&msg, time.Now()) require.NoError(t, err) numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains() @@ -591,21 +540,10 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) { assert.Equal(t, uint64(4436), valueTrans) assert.Equal(t, 1, numPending) assert.Equal(t, uint64(2218274), valuePending) - assert.Equal(t, 3, len(gov.msgsById)) // But a small one should still go through. - msg4 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), - Timestamp: time.Unix(int64(1654543099), 0), - Nonce: uint32(1), - Sequence: uint64(4), - EmitterChain: vaa.ChainIDEthereum, - EmitterAddress: tokenBridgeAddr, - ConsistencyLevel: uint8(32), - Payload: payloadBytes1, - } - - canPost, err = gov.ProcessMsgForTime(&msg4, time.Now()) + msg.Payload = payloadBytes1 + canPost, err = gov.ProcessMsgForTime(&msg, time.Now()) require.NoError(t, err) numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains() @@ -614,7 +552,6 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) { assert.Equal(t, uint64(4436+2218), valueTrans) assert.Equal(t, 1, numPending) assert.Equal(t, uint64(2218274), valuePending) - assert.Equal(t, 4, len(gov.msgsById)) } func TestPendingTransferBeingReleased(t *testing.T) { @@ -666,7 +603,6 @@ func TestPendingTransferBeingReleased(t *testing.T) { assert.Equal(t, uint64(479147), valueTrans) assert.Equal(t, 0, numPending) assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 1, len(gov.msgsById)) // And so should the second. payloadBytes2 := buildMockTransferPayloadBytes(1, @@ -681,7 +617,7 @@ func TestPendingTransferBeingReleased(t *testing.T) { TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), - Sequence: uint64(2), + Sequence: uint64(1), EmitterChain: vaa.ChainIDEthereum, EmitterAddress: tokenBridgeAddr, ConsistencyLevel: uint8(32), @@ -698,7 +634,6 @@ func TestPendingTransferBeingReleased(t *testing.T) { assert.Equal(t, uint64(479147+488020), valueTrans) assert.Equal(t, 0, numPending) assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 2, len(gov.msgsById)) // But the third one should be queued up. payloadBytes3 := buildMockTransferPayloadBytes(1, @@ -713,7 +648,7 @@ func TestPendingTransferBeingReleased(t *testing.T) { TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), - Sequence: uint64(3), + Sequence: uint64(1), EmitterChain: vaa.ChainIDEthereum, EmitterAddress: tokenBridgeAddr, ConsistencyLevel: uint8(32), @@ -730,7 +665,6 @@ func TestPendingTransferBeingReleased(t *testing.T) { assert.Equal(t, uint64(479147+488020), valueTrans) assert.Equal(t, 1, numPending) assert.Equal(t, uint64(496893), valuePending) - assert.Equal(t, 3, len(gov.msgsById)) // And so should the fourth one. payloadBytes4 := buildMockTransferPayloadBytes(1, @@ -745,7 +679,7 @@ func TestPendingTransferBeingReleased(t *testing.T) { TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), - Sequence: uint64(4), + Sequence: uint64(1), EmitterChain: vaa.ChainIDEthereum, EmitterAddress: tokenBridgeAddr, ConsistencyLevel: uint8(32), @@ -762,7 +696,6 @@ func TestPendingTransferBeingReleased(t *testing.T) { assert.Equal(t, uint64(479147+488020), valueTrans) assert.Equal(t, 2, numPending) assert.Equal(t, uint64(496893+532385), valuePending) - assert.Equal(t, 4, len(gov.msgsById)) // If we check pending before noon, nothing should happen. now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 9:00am (CST)") @@ -775,7 +708,6 @@ func TestPendingTransferBeingReleased(t *testing.T) { assert.Equal(t, uint64(479147+488020), valueTrans) assert.Equal(t, 2, numPending) assert.Equal(t, uint64(496893+532385), valuePending) - assert.Equal(t, 4, len(gov.msgsById)) // But at 3pm, the first one should drop off and the first queued one should get posted. now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 3:00pm (CST)") @@ -789,7 +721,6 @@ func TestPendingTransferBeingReleased(t *testing.T) { assert.Equal(t, uint64(488020+496893), valueTrans) assert.Equal(t, 1, numPending) assert.Equal(t, uint64(532385), valuePending) - assert.Equal(t, 3, len(gov.msgsById)) } func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { @@ -839,14 +770,13 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { assert.Equal(t, uint64(479147), valueTrans) assert.Equal(t, 0, numPending) assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 1, len(gov.msgsById)) // And so should the second. msg2 := common.MessagePublication{ TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), - Sequence: uint64(2), + Sequence: uint64(1), EmitterChain: vaa.ChainIDEthereum, EmitterAddress: tokenBridgeAddr, ConsistencyLevel: uint8(32), @@ -869,14 +799,13 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { assert.Equal(t, uint64(479147+488020), valueTrans) assert.Equal(t, 0, numPending) assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 2, len(gov.msgsById)) // But the third, big one should be queued up. msg3 := common.MessagePublication{ TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), - Sequence: uint64(3), + Sequence: uint64(1), EmitterChain: vaa.ChainIDEthereum, EmitterAddress: tokenBridgeAddr, ConsistencyLevel: uint8(32), @@ -899,14 +828,13 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { assert.Equal(t, uint64(479147+488020), valueTrans) assert.Equal(t, 1, numPending) assert.Equal(t, uint64(887309), valuePending) - assert.Equal(t, 3, len(gov.msgsById)) // A fourth, smaller, but still too big one, should get enqueued. msg4 := common.MessagePublication{ TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), - Sequence: uint64(4), + Sequence: uint64(1), EmitterChain: vaa.ChainIDEthereum, EmitterAddress: tokenBridgeAddr, ConsistencyLevel: uint8(32), @@ -929,14 +857,13 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { assert.Equal(t, uint64(479147+488020), valueTrans) assert.Equal(t, 2, numPending) assert.Equal(t, uint64(887309+177461), valuePending) - assert.Equal(t, 4, len(gov.msgsById)) // A fifth, smaller, but still too big one, should also get enqueued. msg5 := common.MessagePublication{ TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), - Sequence: uint64(5), + Sequence: uint64(1), EmitterChain: vaa.ChainIDEthereum, EmitterAddress: tokenBridgeAddr, ConsistencyLevel: uint8(32), @@ -959,14 +886,13 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { assert.Equal(t, uint64(479147+488020), valueTrans) assert.Equal(t, 3, numPending) assert.Equal(t, uint64(887309+177461+179236), valuePending) - assert.Equal(t, 5, len(gov.msgsById)) // A sixth, big one should also get enqueued. msg6 := common.MessagePublication{ TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), - Sequence: uint64(6), + Sequence: uint64(1), EmitterChain: vaa.ChainIDEthereum, EmitterAddress: tokenBridgeAddr, ConsistencyLevel: uint8(32), @@ -989,7 +915,6 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { assert.Equal(t, uint64(479147+488020), valueTrans) assert.Equal(t, 4, numPending) assert.Equal(t, uint64(887309+177461+179236+889084), valuePending) - assert.Equal(t, 6, len(gov.msgsById)) // If we check pending before noon, nothing should happen. now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 9:00am (CST)") @@ -1002,7 +927,6 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { assert.Equal(t, uint64(479147+488020), valueTrans) assert.Equal(t, 4, numPending) assert.Equal(t, uint64(887309+177461+179236+889084), valuePending) - assert.Equal(t, 6, len(gov.msgsById)) // But at 3pm, the first one should drop off. This should result in the second and third, smaller pending ones being posted, but not the two big ones. now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 3:00pm (CST)") @@ -1016,7 +940,6 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { assert.Equal(t, uint64(488020+177461+179236), valueTrans) assert.Equal(t, 2, numPending) assert.Equal(t, uint64(887309+889084), valuePending) - assert.Equal(t, 5, len(gov.msgsById)) } func TestMainnetConfigIsValid(t *testing.T) { @@ -1086,7 +1009,6 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T assert.Equal(t, uint64(88730), valueTrans) assert.Equal(t, 0, numPending) assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 1, len(gov.msgsById)) // And so should the second. msg2 := common.MessagePublication{ @@ -1116,7 +1038,6 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T assert.Equal(t, uint64(88730+88730), valueTrans) assert.Equal(t, 0, numPending) assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 2, len(gov.msgsById)) // But the third big one should get enqueued. msg3 := common.MessagePublication{ @@ -1146,7 +1067,6 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T assert.Equal(t, uint64(88730+88730), valueTrans) assert.Equal(t, 1, numPending) assert.Equal(t, uint64(177461), valuePending) - assert.Equal(t, 3, len(gov.msgsById)) // If we check pending before noon, nothing should happen. now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 9:00am (CST)") @@ -1160,14 +1080,12 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T assert.Equal(t, uint64(88730+88730), valueTrans) assert.Equal(t, 1, numPending) assert.Equal(t, uint64(177461), valuePending) - assert.Equal(t, 3, len(gov.msgsById)) numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains() assert.Equal(t, 2, numTrans) assert.Equal(t, uint64(88730+88730), valueTrans) assert.Equal(t, 1, numPending) assert.Equal(t, uint64(177461), valuePending) - assert.Equal(t, 3, len(gov.msgsById)) // But just after noon, the first one should drop off. The big pending one should not be affected. now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 12:01pm (CST)") @@ -1181,7 +1099,6 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T assert.Equal(t, uint64(88730), valueTrans) assert.Equal(t, 1, numPending) assert.Equal(t, uint64(177461), valuePending) - assert.Equal(t, 2, len(gov.msgsById)) // And Just after 6pm, the second one should drop off. The big pending one should still not be affected. now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 6:01pm (CST)") @@ -1195,7 +1112,6 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T assert.Equal(t, uint64(0), valueTrans) assert.Equal(t, 1, numPending) assert.Equal(t, uint64(177461), valuePending) - assert.Equal(t, 1, len(gov.msgsById)) // 23 hours after the big transaction is enqueued, it should still be there. now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 3, 2022 at 1:01am (CST)") @@ -1209,9 +1125,8 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T assert.Equal(t, uint64(0), valueTrans) assert.Equal(t, 1, numPending) assert.Equal(t, uint64(177461), valuePending) - assert.Equal(t, 1, len(gov.msgsById)) - // But then the operator resets the release time. + // // But then the operator resets the release time. _, err = gov.resetReleaseTimerForTime(msg3.MessageIDString(), now) require.NoError(t, err) @@ -1227,7 +1142,6 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T assert.Equal(t, uint64(0), valueTrans) assert.Equal(t, 1, numPending) assert.Equal(t, uint64(177461), valuePending) - assert.Equal(t, 1, len(gov.msgsById)) // But finally, a full 24hrs, it should get released. now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 4, 2022 at 1:01am (CST)") @@ -1241,7 +1155,6 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T assert.Equal(t, uint64(0), valueTrans) assert.Equal(t, 0, numPending) assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 0, len(gov.msgsById)) // But the big transaction should not affect the daily notional. ce, exists := gov.chains[vaa.ChainIDEthereum] @@ -1303,7 +1216,6 @@ func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) { assert.Equal(t, uint64(0), valueTrans) assert.Equal(t, 1, numPending) assert.Equal(t, uint64(88730), valuePending) - assert.Equal(t, 1, len(gov.msgsById)) // If we check 23hrs later, nothing should happen. now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 11:00am (CST)") @@ -1317,7 +1229,6 @@ func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) { assert.Equal(t, uint64(0), valueTrans) assert.Equal(t, 1, numPending) assert.Equal(t, uint64(88730), valuePending) - assert.Equal(t, 1, len(gov.msgsById)) // But after 24hrs, it should get released. now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 12:01pm (CST)") @@ -1331,7 +1242,6 @@ func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) { assert.Equal(t, uint64(0), valueTrans) assert.Equal(t, 0, numPending) assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 0, len(gov.msgsById)) } func TestIsBigTransfer(t *testing.T) { @@ -1400,395 +1310,3 @@ func TestTransferPayloadTooShort(t *testing.T) { canPost := gov.ProcessMsg(&msg) assert.Equal(t, false, canPost) } - -func TestQuorumAfterTransferCompleteIsIgnored(t *testing.T) { - ctx := context.Background() - gov, err := newChainGovernorForTest(ctx) - - require.NoError(t, err) - assert.NotNil(t, gov) - - tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec - toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8" - tokenBridgeAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec - tokenBridgeAddr, err := vaa.StringToAddress(tokenBridgeAddrStr) - require.NoError(t, err) - - gov.setDayLengthInMinutes(24 * 60) - err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStr, 1000000, 0) - require.NoError(t, err) - err = gov.setTokenForTesting(vaa.ChainIDEthereum, tokenAddrStr, "WETH", 1774.62) - require.NoError(t, err) - - payloadBytes1 := buildMockTransferPayloadBytes(1, - vaa.ChainIDEthereum, - tokenAddrStr, - vaa.ChainIDPolygon, - toAddrStr, - 1.25, - ) - - msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), - Timestamp: time.Unix(int64(1654543099), 0), - Nonce: uint32(1), - Sequence: uint64(1), - EmitterChain: vaa.ChainIDEthereum, - EmitterAddress: tokenBridgeAddr, - ConsistencyLevel: uint8(32), - Payload: payloadBytes1, - } - - canPost, err := gov.ProcessMsgForTime(&msg, time.Now()) - require.NoError(t, err) - - numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains() - assert.Equal(t, true, canPost) - assert.Equal(t, 1, numTrans) - assert.Equal(t, uint64(2218), valueTrans) - assert.Equal(t, 0, numPending) - assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 1, len(gov.msgsById)) - - v := &vaa.VAA{ - Version: vaa.SupportedVAAVersion, - GuardianSetIndex: 1, - Signatures: nil, - Timestamp: msg.Timestamp, - Nonce: msg.Nonce, - EmitterChain: msg.EmitterChain, - EmitterAddress: msg.EmitterAddress, - Payload: msg.Payload, - Sequence: msg.Sequence, - ConsistencyLevel: msg.ConsistencyLevel, - } - - gov.ProcessInboundQuorum(v) - - numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains() - assert.Equal(t, 1, numTrans) - assert.Equal(t, uint64(2218), valueTrans) - assert.Equal(t, 0, numPending) - assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 1, len(gov.msgsById)) -} - -func TestQuorumWhenEnqueuedUpdatesTheWindowAndDropsTheEnqueuedEvent(t *testing.T) { - ctx := context.Background() - gov, err := newChainGovernorForTest(ctx) - - require.NoError(t, err) - assert.NotNil(t, gov) - - tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec - toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8" - tokenBridgeAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec - tokenBridgeAddr, err := vaa.StringToAddress(tokenBridgeAddrStr) - require.NoError(t, err) - - gov.setDayLengthInMinutes(24 * 60) - err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStr, 1000000, 0) - require.NoError(t, err) - err = gov.setTokenForTesting(vaa.ChainIDEthereum, tokenAddrStr, "WETH", 1774.62) - require.NoError(t, err) - - payloadBytes1 := buildMockTransferPayloadBytes(1, - vaa.ChainIDEthereum, - tokenAddrStr, - vaa.ChainIDPolygon, - toAddrStr, - 1000, - ) - - msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), - Timestamp: time.Unix(int64(1654543099), 0), - Nonce: uint32(1), - Sequence: uint64(1), - EmitterChain: vaa.ChainIDEthereum, - EmitterAddress: tokenBridgeAddr, - ConsistencyLevel: uint8(32), - Payload: payloadBytes1, - } - - canPost, err := gov.ProcessMsgForTime(&msg, time.Now()) - require.NoError(t, err) - - numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains() - assert.Equal(t, false, canPost) - assert.Equal(t, 0, numTrans) - assert.Equal(t, uint64(0), valueTrans) - assert.Equal(t, 1, numPending) - assert.Equal(t, uint64(1_774_619), valuePending) - assert.Equal(t, 1, len(gov.msgsById)) - - v := &vaa.VAA{ - Version: vaa.SupportedVAAVersion, - GuardianSetIndex: 1, - Signatures: nil, - Timestamp: msg.Timestamp, - Nonce: msg.Nonce, - EmitterChain: msg.EmitterChain, - EmitterAddress: msg.EmitterAddress, - Payload: msg.Payload, - Sequence: msg.Sequence, - ConsistencyLevel: msg.ConsistencyLevel, - } - - gov.ProcessInboundQuorum(v) - - numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains() - assert.Equal(t, 1, numTrans) - assert.Equal(t, uint64(1_774_619), valueTrans) - assert.Equal(t, 0, numPending) - assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 1, len(gov.msgsById)) -} - -func TestQuorumBeforeLocalMessageUpdatesTheWindow(t *testing.T) { - ctx := context.Background() - gov, err := newChainGovernorForTest(ctx) - - require.NoError(t, err) - assert.NotNil(t, gov) - - tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec - toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8" - tokenBridgeAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec - tokenBridgeAddr, err := vaa.StringToAddress(tokenBridgeAddrStr) - require.NoError(t, err) - - gov.setDayLengthInMinutes(24 * 60) - err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStr, 1000000, 0) - require.NoError(t, err) - err = gov.setTokenForTesting(vaa.ChainIDEthereum, tokenAddrStr, "WETH", 1774.62) - require.NoError(t, err) - - payloadBytes1 := buildMockTransferPayloadBytes(1, - vaa.ChainIDEthereum, - tokenAddrStr, - vaa.ChainIDPolygon, - toAddrStr, - 1.25, - ) - - msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), - Timestamp: time.Unix(int64(1654543099), 0), - Nonce: uint32(1), - Sequence: uint64(1), - EmitterChain: vaa.ChainIDEthereum, - EmitterAddress: tokenBridgeAddr, - ConsistencyLevel: uint8(32), - Payload: payloadBytes1, - } - - v := &vaa.VAA{ - Version: vaa.SupportedVAAVersion, - GuardianSetIndex: 1, - Signatures: nil, - Timestamp: msg.Timestamp, - Nonce: msg.Nonce, - EmitterChain: msg.EmitterChain, - EmitterAddress: msg.EmitterAddress, - Payload: msg.Payload, - Sequence: msg.Sequence, - ConsistencyLevel: msg.ConsistencyLevel, - } - - gov.ProcessInboundQuorum(v) - - numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains() - assert.Equal(t, 1, numTrans) - assert.Equal(t, uint64(2218), valueTrans) - assert.Equal(t, 0, numPending) - assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 1, len(gov.msgsById)) -} - -func TestLocalMessageAfterQuorumIsPublishedButNotAddedToTheWindowAgain(t *testing.T) { - ctx := context.Background() - gov, err := newChainGovernorForTest(ctx) - - require.NoError(t, err) - assert.NotNil(t, gov) - - tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec - toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8" - tokenBridgeAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec - tokenBridgeAddr, err := vaa.StringToAddress(tokenBridgeAddrStr) - require.NoError(t, err) - - gov.setDayLengthInMinutes(24 * 60) - err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStr, 1000000, 0) - require.NoError(t, err) - err = gov.setTokenForTesting(vaa.ChainIDEthereum, tokenAddrStr, "WETH", 1774.62) - require.NoError(t, err) - - payloadBytes1 := buildMockTransferPayloadBytes(1, - vaa.ChainIDEthereum, - tokenAddrStr, - vaa.ChainIDPolygon, - toAddrStr, - 1.25, - ) - - msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), - Timestamp: time.Unix(int64(1654543099), 0), - Nonce: uint32(1), - Sequence: uint64(1), - EmitterChain: vaa.ChainIDEthereum, - EmitterAddress: tokenBridgeAddr, - ConsistencyLevel: uint8(32), - Payload: payloadBytes1, - } - - v := &vaa.VAA{ - Version: vaa.SupportedVAAVersion, - GuardianSetIndex: 1, - Signatures: nil, - Timestamp: msg.Timestamp, - Nonce: msg.Nonce, - EmitterChain: msg.EmitterChain, - EmitterAddress: msg.EmitterAddress, - Payload: msg.Payload, - Sequence: msg.Sequence, - ConsistencyLevel: msg.ConsistencyLevel, - } - - gov.ProcessInboundQuorum(v) - - numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains() - assert.Equal(t, 1, numTrans) - assert.Equal(t, uint64(2218), valueTrans) - assert.Equal(t, 0, numPending) - assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 1, len(gov.msgsById)) - - canPost, err := gov.ProcessMsgForTime(&msg, time.Now()) - require.NoError(t, err) - - numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains() - assert.Equal(t, true, canPost) - assert.Equal(t, 1, numTrans) - assert.Equal(t, uint64(2218), valueTrans) - assert.Equal(t, 0, numPending) - assert.Equal(t, uint64(0), valuePending) - assert.Equal(t, 1, len(gov.msgsById)) -} - -func TestDontReloadDuplicates(t *testing.T) { - ctx := context.Background() - gov, err := newChainGovernorForTest(ctx) - - require.NoError(t, err) - assert.NotNil(t, gov) - - emitterAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec - emitterAddr, err := vaa.StringToAddress(emitterAddrStr) - require.NoError(t, err) - - tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec - tokenAddr, err := vaa.StringToAddress(tokenAddrStr) - require.NoError(t, err) - toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8" - - require.NoError(t, err) - - gov.setDayLengthInMinutes(24 * 60) - err = gov.setChainForTesting(vaa.ChainIDEthereum, emitterAddrStr, 1000000, 0) - require.NoError(t, err) - err = gov.setTokenForTesting(vaa.ChainIDEthereum, emitterAddrStr, "WETH", 1774.62) - require.NoError(t, err) - - now, _ := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 12:01pm (CST)") - startTime := now.Add(-time.Minute * time.Duration(gov.dayLengthInMinutes)) - - var xfers []*db.Transfer - - xfer1 := &db.Transfer{ - Timestamp: startTime.Add(time.Minute * 5), - Value: uint64(1000), - OriginChain: vaa.ChainIDEthereum, - OriginAddress: tokenAddr, - EmitterChain: vaa.ChainIDEthereum, - EmitterAddress: emitterAddr, - MsgID: "2/" + emitterAddrStr + "/125", - } - xfers = append(xfers, xfer1) - - xfer2 := &db.Transfer{ - Timestamp: startTime.Add(time.Minute * 5), - Value: uint64(2000), - OriginChain: vaa.ChainIDEthereum, - OriginAddress: tokenAddr, - EmitterChain: vaa.ChainIDEthereum, - EmitterAddress: emitterAddr, - MsgID: "2/" + emitterAddrStr + "/126", - } - xfers = append(xfers, xfer2) - - // Add a duplicate of each transfer - xfers = append(xfers, xfer1) - xfers = append(xfers, xfer2) - assert.Equal(t, 4, len(xfers)) - - payload1 := buildMockTransferPayloadBytes(1, - vaa.ChainIDEthereum, - tokenAddrStr, - vaa.ChainIDPolygon, - toAddrStr, - 1.25, - ) - - var pendings []*db.PendingTransfer - pending1 := &db.PendingTransfer{ - ReleaseTime: now.Add(time.Hour * 24), - Msg: common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), - Timestamp: time.Unix(int64(1654543099), 0), - Nonce: uint32(1), - Sequence: uint64(200), - EmitterChain: vaa.ChainIDEthereum, - EmitterAddress: emitterAddr, - ConsistencyLevel: uint8(32), - Payload: payload1, - }, - } - pendings = append(pendings, pending1) - - pending2 := &db.PendingTransfer{ - ReleaseTime: now.Add(time.Hour * 24), - Msg: common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), - Timestamp: time.Unix(int64(1654543099), 0), - Nonce: uint32(1), - Sequence: uint64(201), - EmitterChain: vaa.ChainIDEthereum, - EmitterAddress: emitterAddr, - ConsistencyLevel: uint8(32), - Payload: payload1, - }, - } - pendings = append(pendings, pending2) - - // Add a duplicate of each pending transfer - pendings = append(pendings, pending1) - pendings = append(pendings, pending2) - assert.Equal(t, 4, len(pendings)) - - for _, p := range xfers { - gov.reloadTransfer(p, now, startTime) - } - - for _, p := range pendings { - gov.reloadPendingTransfer(p, now) - } - - numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains() - assert.Equal(t, 2, numTrans) - assert.Equal(t, uint64(3000), valueTrans) - assert.Equal(t, 2, numPending) - assert.Equal(t, uint64(4436), valuePending) -} diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index 96ec09069..f2b26b83e 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -8,7 +8,6 @@ import ( node_common "github.com/certusone/wormhole/node/pkg/common" "github.com/certusone/wormhole/node/pkg/db" - "github.com/certusone/wormhole/node/pkg/governor" "github.com/mr-tron/base58" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -219,7 +218,7 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs } } -func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum, gov *governor.ChainGovernor) { +func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum) { v, err := vaa.Unmarshal(m.Vaa) if err != nil { p.logger.Warn("received invalid VAA in SignedVAAWithQuorum message", @@ -313,8 +312,4 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos return } p.attestationEvents.ReportVAAQuorum(v) - - if gov != nil { - gov.ProcessInboundQuorum(v) - } } diff --git a/node/pkg/processor/observation_test.go b/node/pkg/processor/observation_test.go index 84a87653d..a9a6c6b98 100644 --- a/node/pkg/processor/observation_test.go +++ b/node/pkg/processor/observation_test.go @@ -50,7 +50,7 @@ func TestHandleInboundSignedVAAWithQuorum_NilGuardianSet(t *testing.T) { processor := Processor{} processor.logger = observedLogger - processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum, nil) + processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum) // Check to see if we got an error, which we should have, // because a `gs` is not defined on processor @@ -114,7 +114,7 @@ func TestHandleInboundSignedVAAWithQuorum(t *testing.T) { processor.gs = &guardianSet processor.logger = observedLogger - processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum, nil) + processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum) // Check to see if we got an error, which we should have assert.Equal(t, 1, observedLogs.Len()) diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index 2fa910ec8..f8613ff13 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -212,7 +212,7 @@ func (p *Processor) Run(ctx context.Context) error { case m := <-p.obsvC: p.handleObservation(ctx, m) case m := <-p.signedInC: - p.handleInboundSignedVAAWithQuorum(ctx, m, p.governor) + p.handleInboundSignedVAAWithQuorum(ctx, m) case <-p.cleanup.C: p.handleCleanup(ctx) case <-govTimer.C: