node: governor listen to quorum gossip (#1487)
* governor listen to quorum gossip * governor listen to quorum gossip * Fix build error in tests * See p2p from other guardians in devnet * Fix test broke during merge * Change delete function being used * Don't reload duplicates on startup
This commit is contained in:
parent
7fcbabe720
commit
0dbd0b6628
|
@ -440,13 +440,22 @@ func runNode(cmd *cobra.Command, args []string) {
|
|||
|
||||
// In devnet mode, we automatically set a number of flags that rely on deterministic keys.
|
||||
if *unsafeDevMode {
|
||||
g0key, err := peer.IDFromPrivateKey(devnet.DeterministicP2PPrivKeyByIndex(0))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
// When running multiple guardians in tilt, we only see p2p traffic from others if they are also bootstrap hosts.
|
||||
p2pStr := ""
|
||||
for idx := 0; idx < int(*devNumGuardians); idx++ {
|
||||
key, err := peer.IDFromPrivateKey(devnet.DeterministicP2PPrivKeyByIndex(int64(idx)))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if p2pStr != "" {
|
||||
p2pStr = p2pStr + ","
|
||||
}
|
||||
p2pStr = p2pStr + fmt.Sprintf("/dns4/guardian-%d.guardian/udp/%d/quic/p2p/%s", idx, *p2pPort, key.String())
|
||||
}
|
||||
|
||||
// Use the first guardian node as bootstrap
|
||||
*p2pBootstrap = fmt.Sprintf("/dns4/guardian-0.guardian/udp/%d/quic/p2p/%s", *p2pPort, g0key.String())
|
||||
*p2pBootstrap = p2pStr
|
||||
logger.Info("running in dev mode", zap.Uint("devNumGuardians", *devNumGuardians), zap.String("p2pBootstrap", *p2pBootstrap))
|
||||
|
||||
// Deterministic ganache ETH devnet address.
|
||||
*ethContract = devnet.GanacheWormholeContractAddress.Hex()
|
||||
|
|
|
@ -264,8 +264,10 @@ func (d *Database) GetChainGovernorDataForTime(logger *zap.Logger, now time.Time
|
|||
}
|
||||
|
||||
key := oldPendingMsgID(&pending.Msg)
|
||||
err = d.db.DropPrefix(key)
|
||||
if err != nil {
|
||||
if err := d.db.Update(func(txn *badger.Txn) error {
|
||||
err := txn.Delete(key)
|
||||
return err
|
||||
}); err != nil {
|
||||
return fmt.Errorf("failed to delete old pending msg for key [%v]: %w", pending.Msg.MessageIDString(), err)
|
||||
}
|
||||
}
|
||||
|
@ -315,8 +317,11 @@ func (d *Database) StorePendingMsg(pending *PendingTransfer) error {
|
|||
// This is called by the chain governor to delete a transfer after the time limit has expired.
|
||||
func (d *Database) DeleteTransfer(t *Transfer) error {
|
||||
key := TransferMsgID(t)
|
||||
err := d.db.DropPrefix(key)
|
||||
if err != nil {
|
||||
|
||||
if err := d.db.Update(func(txn *badger.Txn) error {
|
||||
err := txn.Delete(key)
|
||||
return err
|
||||
}); err != nil {
|
||||
return fmt.Errorf("failed to delete transfer msg for key [%v]: %w", key, err)
|
||||
}
|
||||
|
||||
|
@ -326,8 +331,10 @@ func (d *Database) DeleteTransfer(t *Transfer) error {
|
|||
// This is called by the chain governor to delete a pending transfer.
|
||||
func (d *Database) DeletePendingMsg(pending *PendingTransfer) error {
|
||||
key := PendingMsgID(&pending.Msg)
|
||||
err := d.db.DropPrefix(key)
|
||||
if err != nil {
|
||||
if err := d.db.Update(func(txn *badger.Txn) error {
|
||||
err := txn.Delete(key)
|
||||
return err
|
||||
}); err != nil {
|
||||
return fmt.Errorf("failed to delete pending msg for key [%v]: %w", key, err)
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,13 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (d *Database) rowExistsInDB(key []byte) error {
|
||||
return d.db.View(func(txn *badger.Txn) error {
|
||||
_, err := txn.Get(key)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func TestSerializeAndDeserializeOfTransfer(t *testing.T) {
|
||||
tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
|
||||
require.NoError(t, err)
|
||||
|
@ -181,8 +188,14 @@ func TestDeleteTransfer(t *testing.T) {
|
|||
err2 := db.StoreTransfer(xfer1)
|
||||
require.NoError(t, err2)
|
||||
|
||||
// Make sure the xfer exists in the db.
|
||||
assert.NoError(t, db.rowExistsInDB(TransferMsgID(xfer1)))
|
||||
|
||||
err3 := db.DeleteTransfer(xfer1)
|
||||
require.NoError(t, err3)
|
||||
|
||||
// Make sure the xfer is no longer in the db.
|
||||
assert.ErrorIs(t, badger.ErrKeyNotFound, db.rowExistsInDB(TransferMsgID(xfer1)))
|
||||
}
|
||||
|
||||
func TestStorePendingMsg(t *testing.T) {
|
||||
|
@ -240,8 +253,14 @@ func TestDeletePendingMsg(t *testing.T) {
|
|||
err3 := db.StorePendingMsg(pending)
|
||||
require.NoError(t, err3)
|
||||
|
||||
// Make sure the pending transfer exists in the db.
|
||||
assert.NoError(t, db.rowExistsInDB(PendingMsgID(msg)))
|
||||
|
||||
err4 := db.DeletePendingMsg(pending)
|
||||
assert.Nil(t, err4)
|
||||
|
||||
// Make sure the pending transfer is no longer in the db.
|
||||
assert.ErrorIs(t, badger.ErrKeyNotFound, db.rowExistsInDB(PendingMsgID(msg)))
|
||||
}
|
||||
|
||||
func TestSerializeAndDeserializeOfPendingTransfer(t *testing.T) {
|
||||
|
|
|
@ -46,6 +46,9 @@ const (
|
|||
TestNetMode = 2
|
||||
DevNetMode = 3
|
||||
GoTestMode = 4
|
||||
|
||||
transferComplete = true
|
||||
transferEnqueued = false
|
||||
)
|
||||
|
||||
// WARNING: Change me in ./node/db as well
|
||||
|
@ -118,6 +121,7 @@ type ChainGovernor struct {
|
|||
tokens map[tokenKey]*tokenEntry
|
||||
tokensByCoinGeckoId map[string][]*tokenEntry
|
||||
chains map[vaa.ChainID]*chainEntry
|
||||
msgsById map[string]bool // Use consts transferComplete and transferEnqueued.
|
||||
msgsToPublish []*common.MessagePublication
|
||||
dayLengthInMinutes int
|
||||
coinGeckoQuery string
|
||||
|
@ -139,6 +143,7 @@ func NewChainGovernor(
|
|||
tokens: make(map[tokenKey]*tokenEntry),
|
||||
tokensByCoinGeckoId: make(map[string][]*tokenEntry),
|
||||
chains: make(map[vaa.ChainID]*chainEntry),
|
||||
msgsById: make(map[string]bool),
|
||||
env: env,
|
||||
}
|
||||
}
|
||||
|
@ -287,6 +292,8 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
|
|||
return false, fmt.Errorf("msg is nil")
|
||||
}
|
||||
|
||||
msgId := msg.MessageIDString()
|
||||
|
||||
gov.mutex.Lock()
|
||||
defer gov.mutex.Unlock()
|
||||
|
||||
|
@ -294,25 +301,25 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
|
|||
|
||||
// If we don't care about this chain, the VAA can be published.
|
||||
if !exists {
|
||||
gov.logger.Info("cgov: ignoring vaa because the emitter chain is not configured", zap.String("msgID", msg.MessageIDString()))
|
||||
gov.logger.Info("cgov: ignoring vaa because the emitter chain is not configured", zap.String("msgID", msgId))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// If we don't care about this emitter, the VAA can be published.
|
||||
if msg.EmitterAddress != ce.emitterAddr {
|
||||
gov.logger.Info("cgov: ignoring vaa because the emitter address is not configured", zap.String("msgID", msg.MessageIDString()))
|
||||
gov.logger.Info("cgov: ignoring vaa because the emitter address is not configured", zap.String("msgID", msgId))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// We only care about transfers.
|
||||
if !vaa.IsTransfer(msg.Payload) {
|
||||
gov.logger.Info("cgov: ignoring vaa because it is not a transfer", zap.String("msgID", msg.MessageIDString()))
|
||||
gov.logger.Info("cgov: ignoring vaa because it is not a transfer", zap.String("msgID", msgId))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
payload, err := vaa.DecodeTransferPayloadHdr(msg.Payload)
|
||||
if err != nil {
|
||||
gov.logger.Error("cgov: failed to decode vaa", zap.String("msgID", msg.MessageIDString()), zap.Error(err))
|
||||
gov.logger.Error("cgov: failed to decode vaa", zap.String("msgID", msgId), zap.Error(err))
|
||||
return true, err
|
||||
}
|
||||
|
||||
|
@ -320,26 +327,33 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
|
|||
tk := tokenKey{chain: payload.OriginChain, addr: payload.OriginAddress}
|
||||
token, exists := gov.tokens[tk]
|
||||
if !exists {
|
||||
gov.logger.Info("cgov: ignoring vaa because the token is not in the list", zap.String("msgID", msg.MessageIDString()))
|
||||
gov.logger.Info("cgov: ignoring vaa because the token is not in the list", zap.String("msgID", msgId))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// If we've already seen this message via quorum, we can publish it.
|
||||
xferComplete, exists := gov.msgsById[msgId]
|
||||
if exists {
|
||||
gov.logger.Info("cgov: ignoring vaa because it has already been seen", zap.String("msgID", msgId))
|
||||
return xferComplete, nil
|
||||
}
|
||||
|
||||
startTime := now.Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))
|
||||
prevTotalValue, err := ce.TrimAndSumValue(startTime, gov.db)
|
||||
prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime)
|
||||
if err != nil {
|
||||
gov.logger.Error("cgov: failed to trim transfers", zap.String("msgID", msg.MessageIDString()), zap.Error(err))
|
||||
gov.logger.Error("cgov: failed to trim transfers", zap.String("msgID", msgId), zap.Error(err))
|
||||
return false, err
|
||||
}
|
||||
|
||||
value, err := computeValue(payload.Amount, token)
|
||||
if err != nil {
|
||||
gov.logger.Error("cgov: failed to compute value of transfer", zap.String("msgID", msg.MessageIDString()), zap.Error(err))
|
||||
gov.logger.Error("cgov: failed to compute value of transfer", zap.String("msgID", msgId), zap.Error(err))
|
||||
return false, err
|
||||
}
|
||||
|
||||
newTotalValue := prevTotalValue + value
|
||||
if newTotalValue < prevTotalValue {
|
||||
gov.logger.Error("cgov: total value has overflowed", zap.String("msgID", msg.MessageIDString()), zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("newTotalValue", newTotalValue))
|
||||
gov.logger.Error("cgov: total value has overflowed", zap.String("msgID", msgId), zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("newTotalValue", newTotalValue))
|
||||
return false, fmt.Errorf("total value has overflowed")
|
||||
}
|
||||
|
||||
|
@ -352,7 +366,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
|
|||
zap.Uint64("value", value),
|
||||
zap.Uint64("prevTotalValue", prevTotalValue),
|
||||
zap.Uint64("newTotalValue", newTotalValue),
|
||||
zap.String("msgID", msg.MessageIDString()),
|
||||
zap.String("msgID", msgId),
|
||||
zap.Stringer("releaseTime", releaseTime),
|
||||
zap.Uint64("bigTransactionSize", ce.bigTransactionSize),
|
||||
)
|
||||
|
@ -364,16 +378,17 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
|
|||
zap.Uint64("prevTotalValue", prevTotalValue),
|
||||
zap.Uint64("newTotalValue", newTotalValue),
|
||||
zap.Stringer("releaseTime", releaseTime),
|
||||
zap.String("msgID", msg.MessageIDString()),
|
||||
zap.String("msgID", msgId),
|
||||
)
|
||||
}
|
||||
|
||||
if enqueueIt {
|
||||
dbData := db.PendingTransfer{ReleaseTime: releaseTime, Msg: *msg}
|
||||
ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, dbData: dbData})
|
||||
gov.msgsById[msgId] = transferEnqueued
|
||||
err = gov.db.StorePendingMsg(&dbData)
|
||||
if err != nil {
|
||||
gov.logger.Error("cgov: failed to store pending vaa", zap.String("msgID", msg.MessageIDString()), zap.Error(err))
|
||||
gov.logger.Error("cgov: failed to store pending vaa", zap.String("msgID", msgId), zap.Error(err))
|
||||
return false, err
|
||||
}
|
||||
|
||||
|
@ -384,19 +399,143 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
|
|||
zap.Uint64("value", value),
|
||||
zap.Uint64("prevTotalValue", prevTotalValue),
|
||||
zap.Uint64("newTotalValue", newTotalValue),
|
||||
zap.String("msgID", msg.MessageIDString()))
|
||||
zap.String("msgID", msgId))
|
||||
|
||||
xfer := db.Transfer{Timestamp: now, Value: value, OriginChain: token.token.chain, OriginAddress: token.token.addr, EmitterChain: msg.EmitterChain, EmitterAddress: msg.EmitterAddress, MsgID: msg.MessageIDString()}
|
||||
ce.transfers = append(ce.transfers, &xfer)
|
||||
err = gov.db.StoreTransfer(&xfer)
|
||||
xfer := &db.Transfer{Timestamp: now, Value: value, OriginChain: token.token.chain, OriginAddress: token.token.addr, EmitterChain: msg.EmitterChain, EmitterAddress: msg.EmitterAddress, MsgID: msgId}
|
||||
ce.transfers = append(ce.transfers, xfer)
|
||||
gov.msgsById[msgId] = transferComplete
|
||||
err = gov.db.StoreTransfer(xfer)
|
||||
if err != nil {
|
||||
gov.logger.Error("cgov: failed to store transfer", zap.String("msgID", msg.MessageIDString()), zap.Error(err))
|
||||
gov.logger.Error("cgov: failed to store transfer", zap.String("msgID", msgId), zap.Error(err))
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Note: This function only gets called once for each VAA. If the processor finds the VAA in the DB, this function will not be called.
|
||||
func (gov *ChainGovernor) ProcessInboundQuorum(v *vaa.VAA) {
|
||||
if v == nil {
|
||||
gov.logger.Error("cgov: received inbound quorum event with nil vaa")
|
||||
return
|
||||
}
|
||||
|
||||
msgId := v.MessageID()
|
||||
|
||||
gov.mutex.Lock()
|
||||
defer gov.mutex.Unlock()
|
||||
|
||||
ce, exists := gov.chains[v.EmitterChain]
|
||||
|
||||
// If we don't care about this chain, we can ignore this VAA.
|
||||
if !exists {
|
||||
gov.logger.Info("cgov: ignoring incoming quorum vaa because the emitter chain is not configured", zap.String("msgID", msgId))
|
||||
return
|
||||
}
|
||||
|
||||
// If we don't care about this emitter, we can ignore this VAA.
|
||||
if v.EmitterAddress != ce.emitterAddr {
|
||||
gov.logger.Info("cgov: ignoring incoming quorum vaa because the emitter address is not configured", zap.String("msgID", msgId))
|
||||
return
|
||||
}
|
||||
|
||||
// We only care about transfers.
|
||||
if !vaa.IsTransfer(v.Payload) {
|
||||
gov.logger.Info("cgov: ignoring incoming quorum vaa because it is not a transfer", zap.String("msgID", msgId))
|
||||
return
|
||||
}
|
||||
|
||||
payload, err := vaa.DecodeTransferPayloadHdr(v.Payload)
|
||||
if err != nil {
|
||||
gov.logger.Error("cgov: failed to decode incoming quorum vaa", zap.String("msgID", msgId), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
// If we don't care about this token, we can ignore this VAA.
|
||||
tk := tokenKey{chain: payload.OriginChain, addr: payload.OriginAddress}
|
||||
token, exists := gov.tokens[tk]
|
||||
if !exists {
|
||||
gov.logger.Info("cgov: ignoring incoming quorum vaa because the token is not in the list", zap.String("msgID", msgId))
|
||||
return
|
||||
}
|
||||
|
||||
// See if we have already processed this VAA.
|
||||
xferComplete, exists := gov.msgsById[msgId]
|
||||
if exists {
|
||||
if xferComplete {
|
||||
gov.logger.Info("cgov: ignoring incoming quorum vaa because we've already seen it", zap.String("msgID", msgId))
|
||||
return
|
||||
}
|
||||
|
||||
// If we get here, the VAA is enqueued.
|
||||
foundIt := false
|
||||
for _, ce := range gov.chains {
|
||||
for idx, pe := range ce.pending {
|
||||
if msgId == pe.dbData.Msg.MessageIDString() {
|
||||
gov.logger.Info("cgov: received incoming quorum for transfer in the enqueued list, removing it from the pending list", zap.String("msgID", msgId))
|
||||
if err := gov.db.DeletePendingMsg(&pe.dbData); err != nil {
|
||||
gov.logger.Error("cgov: failed to delete pending entry", zap.String("msgId", msgId), zap.Error(err))
|
||||
// Continue on so that our data structures are accurate.
|
||||
}
|
||||
|
||||
ce.pending = append(ce.pending[:idx], ce.pending[idx+1:]...)
|
||||
delete(gov.msgsById, msgId)
|
||||
foundIt = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if foundIt {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !foundIt {
|
||||
gov.logger.Error("cgov: failed to find pending entry for incoming quorum vaa, adding it to the list of transfers", zap.String("msgId", msgId))
|
||||
}
|
||||
}
|
||||
|
||||
value, err := computeValue(payload.Amount, token)
|
||||
if err != nil {
|
||||
gov.logger.Error("cgov: failed to compute value of incoming quorum vaa", zap.String("msgID", msgId), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
startTime := now.Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))
|
||||
prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime)
|
||||
if err != nil {
|
||||
gov.logger.Error("cgov: failed to trim transfers for incoming quorum vaa", zap.String("msgID", msgId), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
newTotalValue := prevTotalValue + value
|
||||
|
||||
if newTotalValue > ce.dailyLimit {
|
||||
gov.logger.Error("cgov: incoming quorum vaa put us over the daily limit",
|
||||
zap.Uint64("value", value),
|
||||
zap.Uint64("prevTotalValue", prevTotalValue),
|
||||
zap.Uint64("newTotalValue", newTotalValue),
|
||||
zap.Uint64("dailyLimit", ce.dailyLimit),
|
||||
zap.String("msgID", msgId))
|
||||
} else {
|
||||
gov.logger.Info("cgov: adding incoming quorum vaa",
|
||||
zap.Uint64("value", value),
|
||||
zap.Uint64("prevTotalValue", prevTotalValue),
|
||||
zap.Uint64("newTotalValue", newTotalValue),
|
||||
zap.String("msgID", msgId))
|
||||
}
|
||||
|
||||
xfer := &db.Transfer{Timestamp: now, Value: value, OriginChain: token.token.chain, OriginAddress: token.token.addr, EmitterChain: v.EmitterChain, EmitterAddress: v.EmitterAddress, MsgID: msgId}
|
||||
ce.transfers = append(ce.transfers, xfer)
|
||||
gov.msgsById[msgId] = transferComplete
|
||||
err = gov.db.StoreTransfer(xfer)
|
||||
if err != nil {
|
||||
gov.logger.Error("cgov: failed to store transfer for incoming quorum vaa", zap.String("msgID", msgId), zap.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (gov *ChainGovernor) CheckPending() ([]*common.MessagePublication, error) {
|
||||
return gov.CheckPendingForTime(time.Now())
|
||||
}
|
||||
|
@ -419,7 +558,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
|
|||
// Keep going as long as we find something that will fit.
|
||||
for {
|
||||
foundOne := false
|
||||
prevTotalValue, err := ce.TrimAndSumValue(startTime, gov.db)
|
||||
prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime)
|
||||
if err != nil {
|
||||
gov.logger.Error("cgov: failed to trim transfers", zap.Error(err))
|
||||
gov.msgsToPublish = msgsToPublish
|
||||
|
@ -428,12 +567,13 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
|
|||
|
||||
// Keep going until we find something that fits or hit the end.
|
||||
for idx, pe := range ce.pending {
|
||||
msgId := pe.dbData.Msg.MessageIDString()
|
||||
value, err := computeValue(pe.amount, pe.token)
|
||||
if err != nil {
|
||||
gov.logger.Error("cgov: failed to compute value for pending vaa",
|
||||
zap.Stringer("amount", pe.amount),
|
||||
zap.Stringer("price", pe.token.price),
|
||||
zap.String("msgID", pe.dbData.Msg.MessageIDString()),
|
||||
zap.String("msgID", msgId),
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
|
@ -453,7 +593,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
|
|||
zap.Stringer("price", pe.token.price),
|
||||
zap.Uint64("value", value),
|
||||
zap.Stringer("releaseTime", pe.dbData.ReleaseTime),
|
||||
zap.String("msgID", pe.dbData.Msg.MessageIDString()))
|
||||
zap.String("msgID", msgId))
|
||||
} else if now.After(pe.dbData.ReleaseTime) {
|
||||
countsTowardsTransfers = false
|
||||
gov.logger.Info("cgov: posting pending vaa because the release time has been reached",
|
||||
|
@ -461,7 +601,7 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
|
|||
zap.Stringer("price", pe.token.price),
|
||||
zap.Uint64("value", value),
|
||||
zap.Stringer("releaseTime", pe.dbData.ReleaseTime),
|
||||
zap.String("msgID", pe.dbData.Msg.MessageIDString()))
|
||||
zap.String("msgID", msgId))
|
||||
} else {
|
||||
newTotalValue := prevTotalValue + value
|
||||
if newTotalValue < prevTotalValue {
|
||||
|
@ -480,21 +620,24 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
|
|||
zap.Uint64("value", value),
|
||||
zap.Uint64("prevTotalValue", prevTotalValue),
|
||||
zap.Uint64("newTotalValue", newTotalValue),
|
||||
zap.String("msgID", pe.dbData.Msg.MessageIDString()))
|
||||
zap.String("msgID", msgId))
|
||||
}
|
||||
|
||||
// If we get here, publish it and remove it from the pending list.
|
||||
msgsToPublish = append(msgsToPublish, &pe.dbData.Msg)
|
||||
|
||||
if countsTowardsTransfers {
|
||||
xfer := db.Transfer{Timestamp: now, Value: value, OriginChain: pe.token.token.chain, OriginAddress: pe.token.token.addr,
|
||||
EmitterChain: pe.dbData.Msg.EmitterChain, EmitterAddress: pe.dbData.Msg.EmitterAddress, MsgID: pe.dbData.Msg.MessageIDString()}
|
||||
ce.transfers = append(ce.transfers, &xfer)
|
||||
gov.msgsById[msgId] = transferComplete
|
||||
xfer := &db.Transfer{Timestamp: now, Value: value, OriginChain: pe.token.token.chain, OriginAddress: pe.token.token.addr,
|
||||
EmitterChain: pe.dbData.Msg.EmitterChain, EmitterAddress: pe.dbData.Msg.EmitterAddress, MsgID: msgId}
|
||||
ce.transfers = append(ce.transfers, xfer)
|
||||
|
||||
if err := gov.db.StoreTransfer(&xfer); err != nil {
|
||||
if err := gov.db.StoreTransfer(xfer); err != nil {
|
||||
gov.msgsToPublish = msgsToPublish
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
delete(gov.msgsById, msgId)
|
||||
}
|
||||
|
||||
if err := gov.db.DeletePendingMsg(&pe.dbData); err != nil {
|
||||
|
@ -535,12 +678,12 @@ func computeValue(amount *big.Int, token *tokenEntry) (uint64, error) {
|
|||
return value, nil
|
||||
}
|
||||
|
||||
func (ce *chainEntry) TrimAndSumValue(startTime time.Time, db db.GovernorDB) (sum uint64, err error) {
|
||||
sum, ce.transfers, err = TrimAndSumValue(ce.transfers, startTime, db)
|
||||
func (gov *ChainGovernor) TrimAndSumValueForChain(ce *chainEntry, startTime time.Time) (sum uint64, err error) {
|
||||
sum, ce.transfers, err = gov.TrimAndSumValue(ce.transfers, startTime)
|
||||
return sum, err
|
||||
}
|
||||
|
||||
func TrimAndSumValue(transfers []*db.Transfer, startTime time.Time, db db.GovernorDB) (uint64, []*db.Transfer, error) {
|
||||
func (gov *ChainGovernor) TrimAndSumValue(transfers []*db.Transfer, startTime time.Time) (uint64, []*db.Transfer, error) {
|
||||
if len(transfers) == 0 {
|
||||
return 0, transfers, nil
|
||||
}
|
||||
|
@ -557,11 +700,10 @@ func TrimAndSumValue(transfers []*db.Transfer, startTime time.Time, db db.Govern
|
|||
}
|
||||
|
||||
if trimIdx >= 0 {
|
||||
if db != nil {
|
||||
for idx := 0; idx <= trimIdx; idx++ {
|
||||
if err := db.DeleteTransfer(transfers[idx]); err != nil {
|
||||
return 0, transfers, err
|
||||
}
|
||||
for idx := 0; idx <= trimIdx; idx++ {
|
||||
delete(gov.msgsById, transfers[idx].MsgID)
|
||||
if err := gov.db.DeleteTransfer(transfers[idx]); err != nil {
|
||||
return 0, transfers, err
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -58,10 +58,11 @@ func (gov *ChainGovernor) loadFromDBAlreadyLocked() error {
|
|||
|
||||
func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now time.Time) {
|
||||
msg := &pending.Msg
|
||||
msgId := msg.MessageIDString()
|
||||
ce, exists := gov.chains[msg.EmitterChain]
|
||||
if !exists {
|
||||
gov.logger.Error("cgov: reloaded pending transfer for unsupported chain, dropping it",
|
||||
zap.String("MsgID", msg.MessageIDString()),
|
||||
zap.String("MsgID", msgId),
|
||||
zap.Stringer("TxHash", msg.TxHash),
|
||||
zap.Stringer("Timestamp", msg.Timestamp),
|
||||
zap.Uint32("Nonce", msg.Nonce),
|
||||
|
@ -75,7 +76,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
|
|||
|
||||
if msg.EmitterAddress != ce.emitterAddr {
|
||||
gov.logger.Error("cgov: reloaded pending transfer for unsupported emitter address, dropping it",
|
||||
zap.String("MsgID", msg.MessageIDString()),
|
||||
zap.String("MsgID", msgId),
|
||||
zap.Stringer("TxHash", msg.TxHash),
|
||||
zap.Stringer("Timestamp", msg.Timestamp),
|
||||
zap.Uint32("Nonce", msg.Nonce),
|
||||
|
@ -90,7 +91,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
|
|||
payload, err := vaa.DecodeTransferPayloadHdr(msg.Payload)
|
||||
if err != nil {
|
||||
gov.logger.Error("cgov: failed to parse payload for reloaded pending transfer, dropping it",
|
||||
zap.String("MsgID", msg.MessageIDString()),
|
||||
zap.String("MsgID", msgId),
|
||||
zap.Stringer("TxHash", msg.TxHash),
|
||||
zap.Stringer("Timestamp", msg.Timestamp),
|
||||
zap.Uint32("Nonce", msg.Nonce),
|
||||
|
@ -109,7 +110,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
|
|||
token, exists := gov.tokens[tk]
|
||||
if !exists {
|
||||
gov.logger.Error("cgov: reloaded pending transfer for unsupported token, dropping it",
|
||||
zap.String("MsgID", msg.MessageIDString()),
|
||||
zap.String("MsgID", msgId),
|
||||
zap.Stringer("TxHash", msg.TxHash),
|
||||
zap.Stringer("Timestamp", msg.Timestamp),
|
||||
zap.Uint32("Nonce", msg.Nonce),
|
||||
|
@ -123,19 +124,34 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer, now
|
|||
return
|
||||
}
|
||||
|
||||
gov.logger.Info("cgov: reloaded pending transfer",
|
||||
zap.String("MsgID", msg.MessageIDString()),
|
||||
zap.Stringer("TxHash", msg.TxHash),
|
||||
zap.Stringer("Timestamp", msg.Timestamp),
|
||||
zap.Uint32("Nonce", msg.Nonce),
|
||||
zap.Uint64("Sequence", msg.Sequence),
|
||||
zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel),
|
||||
zap.Stringer("EmitterChain", msg.EmitterChain),
|
||||
zap.Stringer("EmitterAddress", msg.EmitterAddress),
|
||||
zap.Stringer("Amount", payload.Amount),
|
||||
)
|
||||
if _, exists := gov.msgsById[msgId]; !exists {
|
||||
gov.logger.Info("cgov: reloaded pending transfer",
|
||||
zap.String("MsgID", msgId),
|
||||
zap.Stringer("TxHash", msg.TxHash),
|
||||
zap.Stringer("Timestamp", msg.Timestamp),
|
||||
zap.Uint32("Nonce", msg.Nonce),
|
||||
zap.Uint64("Sequence", msg.Sequence),
|
||||
zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel),
|
||||
zap.Stringer("EmitterChain", msg.EmitterChain),
|
||||
zap.Stringer("EmitterAddress", msg.EmitterAddress),
|
||||
zap.Stringer("Amount", payload.Amount),
|
||||
)
|
||||
|
||||
ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, dbData: *pending})
|
||||
ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, dbData: *pending})
|
||||
gov.msgsById[msgId] = transferEnqueued
|
||||
} else {
|
||||
gov.logger.Info("cgov: dropping duplicate pending transfer",
|
||||
zap.String("MsgID", msgId),
|
||||
zap.Stringer("TxHash", msg.TxHash),
|
||||
zap.Stringer("Timestamp", msg.Timestamp),
|
||||
zap.Uint32("Nonce", msg.Nonce),
|
||||
zap.Uint64("Sequence", msg.Sequence),
|
||||
zap.Uint8("ConsistencyLevel", msg.ConsistencyLevel),
|
||||
zap.Stringer("EmitterChain", msg.EmitterChain),
|
||||
zap.Stringer("EmitterAddress", msg.EmitterAddress),
|
||||
zap.Stringer("Amount", payload.Amount),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer, now time.Time, startTime time.Time) {
|
||||
|
@ -175,13 +191,24 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer, now time.Time, start
|
|||
return
|
||||
}
|
||||
|
||||
gov.logger.Info("cgov: reloaded transfer",
|
||||
zap.Stringer("Timestamp", xfer.Timestamp),
|
||||
zap.Uint64("Value", xfer.Value),
|
||||
zap.Stringer("OriginChain", xfer.OriginChain),
|
||||
zap.Stringer("OriginAddress", xfer.OriginAddress),
|
||||
zap.String("MsgID", xfer.MsgID),
|
||||
)
|
||||
if _, exists := gov.msgsById[xfer.MsgID]; !exists {
|
||||
gov.logger.Info("cgov: reloaded transfer",
|
||||
zap.Stringer("Timestamp", xfer.Timestamp),
|
||||
zap.Uint64("Value", xfer.Value),
|
||||
zap.Stringer("OriginChain", xfer.OriginChain),
|
||||
zap.Stringer("OriginAddress", xfer.OriginAddress),
|
||||
zap.String("MsgID", xfer.MsgID),
|
||||
)
|
||||
|
||||
ce.transfers = append(ce.transfers, xfer)
|
||||
ce.transfers = append(ce.transfers, xfer)
|
||||
gov.msgsById[xfer.MsgID] = transferComplete
|
||||
} else {
|
||||
gov.logger.Info("cgov: dropping duplicate transfer",
|
||||
zap.Stringer("Timestamp", xfer.Timestamp),
|
||||
zap.Uint64("Value", xfer.Value),
|
||||
zap.Stringer("OriginChain", xfer.OriginChain),
|
||||
zap.Stringer("OriginAddress", xfer.OriginAddress),
|
||||
zap.String("MsgID", xfer.MsgID),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -111,17 +111,27 @@ func (gov *ChainGovernor) getStatsForAllChains() (numTrans int, valueTrans uint6
|
|||
}
|
||||
|
||||
func TestTrimEmptyTransfers(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
gov, err := newChainGovernorForTest(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, gov)
|
||||
|
||||
now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)")
|
||||
require.NoError(t, err)
|
||||
|
||||
var transfers []*db.Transfer
|
||||
sum, updatedTransfers, err := TrimAndSumValue(transfers, now, nil)
|
||||
sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, uint64(0), sum)
|
||||
assert.Equal(t, 0, len(updatedTransfers))
|
||||
}
|
||||
|
||||
func TestSumAllFromToday(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
gov, err := newChainGovernorForTest(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, gov)
|
||||
|
||||
now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)")
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -129,13 +139,18 @@ func TestSumAllFromToday(t *testing.T) {
|
|||
transferTime, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 11:00am (CST)")
|
||||
require.NoError(t, err)
|
||||
transfers = append(transfers, &db.Transfer{Value: 125000, Timestamp: transferTime})
|
||||
sum, updatedTransfers, err := TrimAndSumValue(transfers, now.Add(-time.Hour*24), nil)
|
||||
sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now.Add(-time.Hour*24))
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, uint64(125000), sum)
|
||||
assert.Equal(t, 1, len(updatedTransfers))
|
||||
}
|
||||
|
||||
func TestTrimOneOfTwoTransfers(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
gov, err := newChainGovernorForTest(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, gov)
|
||||
|
||||
now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)")
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -152,13 +167,18 @@ func TestTrimOneOfTwoTransfers(t *testing.T) {
|
|||
transfers = append(transfers, &db.Transfer{Value: 225000, Timestamp: transferTime2})
|
||||
assert.Equal(t, 2, len(transfers))
|
||||
|
||||
sum, updatedTransfers, err := TrimAndSumValue(transfers, now.Add(-time.Hour*24), nil)
|
||||
sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now.Add(-time.Hour*24))
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, len(updatedTransfers))
|
||||
assert.Equal(t, uint64(225000), sum)
|
||||
}
|
||||
|
||||
func TestTrimSeveralTransfers(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
gov, err := newChainGovernorForTest(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, gov)
|
||||
|
||||
now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)")
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -188,13 +208,18 @@ func TestTrimSeveralTransfers(t *testing.T) {
|
|||
|
||||
assert.Equal(t, 5, len(transfers))
|
||||
|
||||
sum, updatedTransfers, err := TrimAndSumValue(transfers, now.Add(-time.Hour*24), nil)
|
||||
sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now.Add(-time.Hour*24))
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 3, len(updatedTransfers))
|
||||
assert.Equal(t, uint64(465000), sum)
|
||||
}
|
||||
|
||||
func TestTrimmingAllTransfersShouldReturnZero(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
gov, err := newChainGovernorForTest(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, gov)
|
||||
|
||||
now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)")
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -209,7 +234,7 @@ func TestTrimmingAllTransfersShouldReturnZero(t *testing.T) {
|
|||
transfers = append(transfers, &db.Transfer{Value: 225000, Timestamp: transferTime2})
|
||||
assert.Equal(t, 2, len(transfers))
|
||||
|
||||
sum, updatedTransfers, err := TrimAndSumValue(transfers, now, nil)
|
||||
sum, updatedTransfers, err := gov.TrimAndSumValue(transfers, now)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0, len(updatedTransfers))
|
||||
assert.Equal(t, uint64(0), sum)
|
||||
|
@ -292,6 +317,7 @@ func TestVaaForUninterestingEmitterChain(t *testing.T) {
|
|||
assert.Equal(t, uint64(0), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 0, len(gov.msgsById))
|
||||
}
|
||||
|
||||
func TestVaaForUninterestingEmitterAddress(t *testing.T) {
|
||||
|
@ -324,6 +350,7 @@ func TestVaaForUninterestingEmitterAddress(t *testing.T) {
|
|||
assert.Equal(t, uint64(0), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 0, len(gov.msgsById))
|
||||
}
|
||||
|
||||
func TestVaaForUninterestingPayloadType(t *testing.T) {
|
||||
|
@ -356,6 +383,7 @@ func TestVaaForUninterestingPayloadType(t *testing.T) {
|
|||
assert.Equal(t, uint64(0), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 0, len(gov.msgsById))
|
||||
}
|
||||
|
||||
// Note this method assumes 18 decimals for the amount.
|
||||
|
@ -459,6 +487,7 @@ func TestVaaForUninterestingToken(t *testing.T) {
|
|||
assert.Equal(t, uint64(0), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 0, len(gov.msgsById))
|
||||
}
|
||||
|
||||
func TestTransfersUpToAndOverTheLimit(t *testing.T) {
|
||||
|
@ -489,7 +518,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
|
|||
)
|
||||
|
||||
// The first two transfers should be accepted.
|
||||
msg := common.MessagePublication{
|
||||
msg1 := common.MessagePublication{
|
||||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
|
@ -500,7 +529,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
|
|||
Payload: payloadBytes1,
|
||||
}
|
||||
|
||||
canPost, err := gov.ProcessMsgForTime(&msg, time.Now())
|
||||
canPost, err := gov.ProcessMsgForTime(&msg1, time.Now())
|
||||
require.NoError(t, err)
|
||||
|
||||
numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
|
||||
|
@ -509,8 +538,20 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
|
|||
assert.Equal(t, uint64(2218), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 1, len(gov.msgsById))
|
||||
|
||||
canPost, err = gov.ProcessMsgForTime(&msg, time.Now())
|
||||
msg2 := common.MessagePublication{
|
||||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(2),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: tokenBridgeAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
Payload: payloadBytes1,
|
||||
}
|
||||
|
||||
canPost, err = gov.ProcessMsgForTime(&msg2, time.Now())
|
||||
require.NoError(t, err)
|
||||
|
||||
numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
|
||||
|
@ -519,6 +560,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
|
|||
assert.Equal(t, uint64(4436), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 2, len(gov.msgsById))
|
||||
|
||||
// But the third one should be queued up.
|
||||
payloadBytes2 := buildMockTransferPayloadBytes(1,
|
||||
|
@ -529,9 +571,18 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
|
|||
1250,
|
||||
)
|
||||
|
||||
msg.Payload = payloadBytes2
|
||||
msg3 := common.MessagePublication{
|
||||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(3),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: tokenBridgeAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
Payload: payloadBytes2,
|
||||
}
|
||||
|
||||
canPost, err = gov.ProcessMsgForTime(&msg, time.Now())
|
||||
canPost, err = gov.ProcessMsgForTime(&msg3, time.Now())
|
||||
require.NoError(t, err)
|
||||
|
||||
numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
|
||||
|
@ -540,10 +591,21 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
|
|||
assert.Equal(t, uint64(4436), valueTrans)
|
||||
assert.Equal(t, 1, numPending)
|
||||
assert.Equal(t, uint64(2218274), valuePending)
|
||||
assert.Equal(t, 3, len(gov.msgsById))
|
||||
|
||||
// But a small one should still go through.
|
||||
msg.Payload = payloadBytes1
|
||||
canPost, err = gov.ProcessMsgForTime(&msg, time.Now())
|
||||
msg4 := common.MessagePublication{
|
||||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(4),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: tokenBridgeAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
Payload: payloadBytes1,
|
||||
}
|
||||
|
||||
canPost, err = gov.ProcessMsgForTime(&msg4, time.Now())
|
||||
require.NoError(t, err)
|
||||
|
||||
numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
|
||||
|
@ -552,6 +614,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) {
|
|||
assert.Equal(t, uint64(4436+2218), valueTrans)
|
||||
assert.Equal(t, 1, numPending)
|
||||
assert.Equal(t, uint64(2218274), valuePending)
|
||||
assert.Equal(t, 4, len(gov.msgsById))
|
||||
}
|
||||
|
||||
func TestPendingTransferBeingReleased(t *testing.T) {
|
||||
|
@ -603,6 +666,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
|
|||
assert.Equal(t, uint64(479147), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 1, len(gov.msgsById))
|
||||
|
||||
// And so should the second.
|
||||
payloadBytes2 := buildMockTransferPayloadBytes(1,
|
||||
|
@ -617,7 +681,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
|
|||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(1),
|
||||
Sequence: uint64(2),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: tokenBridgeAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
|
@ -634,6 +698,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
|
|||
assert.Equal(t, uint64(479147+488020), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 2, len(gov.msgsById))
|
||||
|
||||
// But the third one should be queued up.
|
||||
payloadBytes3 := buildMockTransferPayloadBytes(1,
|
||||
|
@ -648,7 +713,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
|
|||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(1),
|
||||
Sequence: uint64(3),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: tokenBridgeAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
|
@ -665,6 +730,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
|
|||
assert.Equal(t, uint64(479147+488020), valueTrans)
|
||||
assert.Equal(t, 1, numPending)
|
||||
assert.Equal(t, uint64(496893), valuePending)
|
||||
assert.Equal(t, 3, len(gov.msgsById))
|
||||
|
||||
// And so should the fourth one.
|
||||
payloadBytes4 := buildMockTransferPayloadBytes(1,
|
||||
|
@ -679,7 +745,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
|
|||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(1),
|
||||
Sequence: uint64(4),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: tokenBridgeAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
|
@ -696,6 +762,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
|
|||
assert.Equal(t, uint64(479147+488020), valueTrans)
|
||||
assert.Equal(t, 2, numPending)
|
||||
assert.Equal(t, uint64(496893+532385), valuePending)
|
||||
assert.Equal(t, 4, len(gov.msgsById))
|
||||
|
||||
// If we check pending before noon, nothing should happen.
|
||||
now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 9:00am (CST)")
|
||||
|
@ -708,6 +775,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
|
|||
assert.Equal(t, uint64(479147+488020), valueTrans)
|
||||
assert.Equal(t, 2, numPending)
|
||||
assert.Equal(t, uint64(496893+532385), valuePending)
|
||||
assert.Equal(t, 4, len(gov.msgsById))
|
||||
|
||||
// But at 3pm, the first one should drop off and the first queued one should get posted.
|
||||
now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 3:00pm (CST)")
|
||||
|
@ -721,6 +789,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
|
|||
assert.Equal(t, uint64(488020+496893), valueTrans)
|
||||
assert.Equal(t, 1, numPending)
|
||||
assert.Equal(t, uint64(532385), valuePending)
|
||||
assert.Equal(t, 3, len(gov.msgsById))
|
||||
}
|
||||
|
||||
func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
|
||||
|
@ -770,13 +839,14 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
|
|||
assert.Equal(t, uint64(479147), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 1, len(gov.msgsById))
|
||||
|
||||
// And so should the second.
|
||||
msg2 := common.MessagePublication{
|
||||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(1),
|
||||
Sequence: uint64(2),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: tokenBridgeAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
|
@ -799,13 +869,14 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
|
|||
assert.Equal(t, uint64(479147+488020), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 2, len(gov.msgsById))
|
||||
|
||||
// But the third, big one should be queued up.
|
||||
msg3 := common.MessagePublication{
|
||||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(1),
|
||||
Sequence: uint64(3),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: tokenBridgeAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
|
@ -828,13 +899,14 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
|
|||
assert.Equal(t, uint64(479147+488020), valueTrans)
|
||||
assert.Equal(t, 1, numPending)
|
||||
assert.Equal(t, uint64(887309), valuePending)
|
||||
assert.Equal(t, 3, len(gov.msgsById))
|
||||
|
||||
// A fourth, smaller, but still too big one, should get enqueued.
|
||||
msg4 := common.MessagePublication{
|
||||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(1),
|
||||
Sequence: uint64(4),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: tokenBridgeAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
|
@ -857,13 +929,14 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
|
|||
assert.Equal(t, uint64(479147+488020), valueTrans)
|
||||
assert.Equal(t, 2, numPending)
|
||||
assert.Equal(t, uint64(887309+177461), valuePending)
|
||||
assert.Equal(t, 4, len(gov.msgsById))
|
||||
|
||||
// A fifth, smaller, but still too big one, should also get enqueued.
|
||||
msg5 := common.MessagePublication{
|
||||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(1),
|
||||
Sequence: uint64(5),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: tokenBridgeAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
|
@ -886,13 +959,14 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
|
|||
assert.Equal(t, uint64(479147+488020), valueTrans)
|
||||
assert.Equal(t, 3, numPending)
|
||||
assert.Equal(t, uint64(887309+177461+179236), valuePending)
|
||||
assert.Equal(t, 5, len(gov.msgsById))
|
||||
|
||||
// A sixth, big one should also get enqueued.
|
||||
msg6 := common.MessagePublication{
|
||||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(1),
|
||||
Sequence: uint64(6),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: tokenBridgeAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
|
@ -915,6 +989,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
|
|||
assert.Equal(t, uint64(479147+488020), valueTrans)
|
||||
assert.Equal(t, 4, numPending)
|
||||
assert.Equal(t, uint64(887309+177461+179236+889084), valuePending)
|
||||
assert.Equal(t, 6, len(gov.msgsById))
|
||||
|
||||
// If we check pending before noon, nothing should happen.
|
||||
now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 9:00am (CST)")
|
||||
|
@ -927,6 +1002,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
|
|||
assert.Equal(t, uint64(479147+488020), valueTrans)
|
||||
assert.Equal(t, 4, numPending)
|
||||
assert.Equal(t, uint64(887309+177461+179236+889084), valuePending)
|
||||
assert.Equal(t, 6, len(gov.msgsById))
|
||||
|
||||
// But at 3pm, the first one should drop off. This should result in the second and third, smaller pending ones being posted, but not the two big ones.
|
||||
now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 3:00pm (CST)")
|
||||
|
@ -940,6 +1016,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
|
|||
assert.Equal(t, uint64(488020+177461+179236), valueTrans)
|
||||
assert.Equal(t, 2, numPending)
|
||||
assert.Equal(t, uint64(887309+889084), valuePending)
|
||||
assert.Equal(t, 5, len(gov.msgsById))
|
||||
}
|
||||
|
||||
func TestMainnetConfigIsValid(t *testing.T) {
|
||||
|
@ -1009,6 +1086,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
|
|||
assert.Equal(t, uint64(88730), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 1, len(gov.msgsById))
|
||||
|
||||
// And so should the second.
|
||||
msg2 := common.MessagePublication{
|
||||
|
@ -1038,6 +1116,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
|
|||
assert.Equal(t, uint64(88730+88730), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 2, len(gov.msgsById))
|
||||
|
||||
// But the third big one should get enqueued.
|
||||
msg3 := common.MessagePublication{
|
||||
|
@ -1067,6 +1146,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
|
|||
assert.Equal(t, uint64(88730+88730), valueTrans)
|
||||
assert.Equal(t, 1, numPending)
|
||||
assert.Equal(t, uint64(177461), valuePending)
|
||||
assert.Equal(t, 3, len(gov.msgsById))
|
||||
|
||||
// If we check pending before noon, nothing should happen.
|
||||
now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 9:00am (CST)")
|
||||
|
@ -1080,12 +1160,14 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
|
|||
assert.Equal(t, uint64(88730+88730), valueTrans)
|
||||
assert.Equal(t, 1, numPending)
|
||||
assert.Equal(t, uint64(177461), valuePending)
|
||||
assert.Equal(t, 3, len(gov.msgsById))
|
||||
|
||||
numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
|
||||
assert.Equal(t, 2, numTrans)
|
||||
assert.Equal(t, uint64(88730+88730), valueTrans)
|
||||
assert.Equal(t, 1, numPending)
|
||||
assert.Equal(t, uint64(177461), valuePending)
|
||||
assert.Equal(t, 3, len(gov.msgsById))
|
||||
|
||||
// But just after noon, the first one should drop off. The big pending one should not be affected.
|
||||
now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 12:01pm (CST)")
|
||||
|
@ -1099,6 +1181,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
|
|||
assert.Equal(t, uint64(88730), valueTrans)
|
||||
assert.Equal(t, 1, numPending)
|
||||
assert.Equal(t, uint64(177461), valuePending)
|
||||
assert.Equal(t, 2, len(gov.msgsById))
|
||||
|
||||
// And Just after 6pm, the second one should drop off. The big pending one should still not be affected.
|
||||
now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 6:01pm (CST)")
|
||||
|
@ -1112,6 +1195,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
|
|||
assert.Equal(t, uint64(0), valueTrans)
|
||||
assert.Equal(t, 1, numPending)
|
||||
assert.Equal(t, uint64(177461), valuePending)
|
||||
assert.Equal(t, 1, len(gov.msgsById))
|
||||
|
||||
// 23 hours after the big transaction is enqueued, it should still be there.
|
||||
now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 3, 2022 at 1:01am (CST)")
|
||||
|
@ -1125,8 +1209,9 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
|
|||
assert.Equal(t, uint64(0), valueTrans)
|
||||
assert.Equal(t, 1, numPending)
|
||||
assert.Equal(t, uint64(177461), valuePending)
|
||||
assert.Equal(t, 1, len(gov.msgsById))
|
||||
|
||||
// // But then the operator resets the release time.
|
||||
// But then the operator resets the release time.
|
||||
_, err = gov.resetReleaseTimerForTime(msg3.MessageIDString(), now)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -1142,6 +1227,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
|
|||
assert.Equal(t, uint64(0), valueTrans)
|
||||
assert.Equal(t, 1, numPending)
|
||||
assert.Equal(t, uint64(177461), valuePending)
|
||||
assert.Equal(t, 1, len(gov.msgsById))
|
||||
|
||||
// But finally, a full 24hrs, it should get released.
|
||||
now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 4, 2022 at 1:01am (CST)")
|
||||
|
@ -1155,6 +1241,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
|
|||
assert.Equal(t, uint64(0), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 0, len(gov.msgsById))
|
||||
|
||||
// But the big transaction should not affect the daily notional.
|
||||
ce, exists := gov.chains[vaa.ChainIDEthereum]
|
||||
|
@ -1216,6 +1303,7 @@ func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) {
|
|||
assert.Equal(t, uint64(0), valueTrans)
|
||||
assert.Equal(t, 1, numPending)
|
||||
assert.Equal(t, uint64(88730), valuePending)
|
||||
assert.Equal(t, 1, len(gov.msgsById))
|
||||
|
||||
// If we check 23hrs later, nothing should happen.
|
||||
now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 11:00am (CST)")
|
||||
|
@ -1229,6 +1317,7 @@ func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) {
|
|||
assert.Equal(t, uint64(0), valueTrans)
|
||||
assert.Equal(t, 1, numPending)
|
||||
assert.Equal(t, uint64(88730), valuePending)
|
||||
assert.Equal(t, 1, len(gov.msgsById))
|
||||
|
||||
// But after 24hrs, it should get released.
|
||||
now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 12:01pm (CST)")
|
||||
|
@ -1242,6 +1331,7 @@ func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) {
|
|||
assert.Equal(t, uint64(0), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 0, len(gov.msgsById))
|
||||
}
|
||||
|
||||
func TestIsBigTransfer(t *testing.T) {
|
||||
|
@ -1310,3 +1400,395 @@ func TestTransferPayloadTooShort(t *testing.T) {
|
|||
canPost := gov.ProcessMsg(&msg)
|
||||
assert.Equal(t, false, canPost)
|
||||
}
|
||||
|
||||
func TestQuorumAfterTransferCompleteIsIgnored(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
gov, err := newChainGovernorForTest(ctx)
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, gov)
|
||||
|
||||
tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec
|
||||
toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8"
|
||||
tokenBridgeAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
|
||||
tokenBridgeAddr, err := vaa.StringToAddress(tokenBridgeAddrStr)
|
||||
require.NoError(t, err)
|
||||
|
||||
gov.setDayLengthInMinutes(24 * 60)
|
||||
err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStr, 1000000, 0)
|
||||
require.NoError(t, err)
|
||||
err = gov.setTokenForTesting(vaa.ChainIDEthereum, tokenAddrStr, "WETH", 1774.62)
|
||||
require.NoError(t, err)
|
||||
|
||||
payloadBytes1 := buildMockTransferPayloadBytes(1,
|
||||
vaa.ChainIDEthereum,
|
||||
tokenAddrStr,
|
||||
vaa.ChainIDPolygon,
|
||||
toAddrStr,
|
||||
1.25,
|
||||
)
|
||||
|
||||
msg := common.MessagePublication{
|
||||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(1),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: tokenBridgeAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
Payload: payloadBytes1,
|
||||
}
|
||||
|
||||
canPost, err := gov.ProcessMsgForTime(&msg, time.Now())
|
||||
require.NoError(t, err)
|
||||
|
||||
numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
|
||||
assert.Equal(t, true, canPost)
|
||||
assert.Equal(t, 1, numTrans)
|
||||
assert.Equal(t, uint64(2218), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 1, len(gov.msgsById))
|
||||
|
||||
v := &vaa.VAA{
|
||||
Version: vaa.SupportedVAAVersion,
|
||||
GuardianSetIndex: 1,
|
||||
Signatures: nil,
|
||||
Timestamp: msg.Timestamp,
|
||||
Nonce: msg.Nonce,
|
||||
EmitterChain: msg.EmitterChain,
|
||||
EmitterAddress: msg.EmitterAddress,
|
||||
Payload: msg.Payload,
|
||||
Sequence: msg.Sequence,
|
||||
ConsistencyLevel: msg.ConsistencyLevel,
|
||||
}
|
||||
|
||||
gov.ProcessInboundQuorum(v)
|
||||
|
||||
numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
|
||||
assert.Equal(t, 1, numTrans)
|
||||
assert.Equal(t, uint64(2218), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 1, len(gov.msgsById))
|
||||
}
|
||||
|
||||
func TestQuorumWhenEnqueuedUpdatesTheWindowAndDropsTheEnqueuedEvent(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
gov, err := newChainGovernorForTest(ctx)
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, gov)
|
||||
|
||||
tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec
|
||||
toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8"
|
||||
tokenBridgeAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
|
||||
tokenBridgeAddr, err := vaa.StringToAddress(tokenBridgeAddrStr)
|
||||
require.NoError(t, err)
|
||||
|
||||
gov.setDayLengthInMinutes(24 * 60)
|
||||
err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStr, 1000000, 0)
|
||||
require.NoError(t, err)
|
||||
err = gov.setTokenForTesting(vaa.ChainIDEthereum, tokenAddrStr, "WETH", 1774.62)
|
||||
require.NoError(t, err)
|
||||
|
||||
payloadBytes1 := buildMockTransferPayloadBytes(1,
|
||||
vaa.ChainIDEthereum,
|
||||
tokenAddrStr,
|
||||
vaa.ChainIDPolygon,
|
||||
toAddrStr,
|
||||
1000,
|
||||
)
|
||||
|
||||
msg := common.MessagePublication{
|
||||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(1),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: tokenBridgeAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
Payload: payloadBytes1,
|
||||
}
|
||||
|
||||
canPost, err := gov.ProcessMsgForTime(&msg, time.Now())
|
||||
require.NoError(t, err)
|
||||
|
||||
numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
|
||||
assert.Equal(t, false, canPost)
|
||||
assert.Equal(t, 0, numTrans)
|
||||
assert.Equal(t, uint64(0), valueTrans)
|
||||
assert.Equal(t, 1, numPending)
|
||||
assert.Equal(t, uint64(1_774_619), valuePending)
|
||||
assert.Equal(t, 1, len(gov.msgsById))
|
||||
|
||||
v := &vaa.VAA{
|
||||
Version: vaa.SupportedVAAVersion,
|
||||
GuardianSetIndex: 1,
|
||||
Signatures: nil,
|
||||
Timestamp: msg.Timestamp,
|
||||
Nonce: msg.Nonce,
|
||||
EmitterChain: msg.EmitterChain,
|
||||
EmitterAddress: msg.EmitterAddress,
|
||||
Payload: msg.Payload,
|
||||
Sequence: msg.Sequence,
|
||||
ConsistencyLevel: msg.ConsistencyLevel,
|
||||
}
|
||||
|
||||
gov.ProcessInboundQuorum(v)
|
||||
|
||||
numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
|
||||
assert.Equal(t, 1, numTrans)
|
||||
assert.Equal(t, uint64(1_774_619), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 1, len(gov.msgsById))
|
||||
}
|
||||
|
||||
func TestQuorumBeforeLocalMessageUpdatesTheWindow(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
gov, err := newChainGovernorForTest(ctx)
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, gov)
|
||||
|
||||
tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec
|
||||
toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8"
|
||||
tokenBridgeAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
|
||||
tokenBridgeAddr, err := vaa.StringToAddress(tokenBridgeAddrStr)
|
||||
require.NoError(t, err)
|
||||
|
||||
gov.setDayLengthInMinutes(24 * 60)
|
||||
err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStr, 1000000, 0)
|
||||
require.NoError(t, err)
|
||||
err = gov.setTokenForTesting(vaa.ChainIDEthereum, tokenAddrStr, "WETH", 1774.62)
|
||||
require.NoError(t, err)
|
||||
|
||||
payloadBytes1 := buildMockTransferPayloadBytes(1,
|
||||
vaa.ChainIDEthereum,
|
||||
tokenAddrStr,
|
||||
vaa.ChainIDPolygon,
|
||||
toAddrStr,
|
||||
1.25,
|
||||
)
|
||||
|
||||
msg := common.MessagePublication{
|
||||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(1),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: tokenBridgeAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
Payload: payloadBytes1,
|
||||
}
|
||||
|
||||
v := &vaa.VAA{
|
||||
Version: vaa.SupportedVAAVersion,
|
||||
GuardianSetIndex: 1,
|
||||
Signatures: nil,
|
||||
Timestamp: msg.Timestamp,
|
||||
Nonce: msg.Nonce,
|
||||
EmitterChain: msg.EmitterChain,
|
||||
EmitterAddress: msg.EmitterAddress,
|
||||
Payload: msg.Payload,
|
||||
Sequence: msg.Sequence,
|
||||
ConsistencyLevel: msg.ConsistencyLevel,
|
||||
}
|
||||
|
||||
gov.ProcessInboundQuorum(v)
|
||||
|
||||
numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
|
||||
assert.Equal(t, 1, numTrans)
|
||||
assert.Equal(t, uint64(2218), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 1, len(gov.msgsById))
|
||||
}
|
||||
|
||||
func TestLocalMessageAfterQuorumIsPublishedButNotAddedToTheWindowAgain(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
gov, err := newChainGovernorForTest(ctx)
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, gov)
|
||||
|
||||
tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec
|
||||
toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8"
|
||||
tokenBridgeAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
|
||||
tokenBridgeAddr, err := vaa.StringToAddress(tokenBridgeAddrStr)
|
||||
require.NoError(t, err)
|
||||
|
||||
gov.setDayLengthInMinutes(24 * 60)
|
||||
err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStr, 1000000, 0)
|
||||
require.NoError(t, err)
|
||||
err = gov.setTokenForTesting(vaa.ChainIDEthereum, tokenAddrStr, "WETH", 1774.62)
|
||||
require.NoError(t, err)
|
||||
|
||||
payloadBytes1 := buildMockTransferPayloadBytes(1,
|
||||
vaa.ChainIDEthereum,
|
||||
tokenAddrStr,
|
||||
vaa.ChainIDPolygon,
|
||||
toAddrStr,
|
||||
1.25,
|
||||
)
|
||||
|
||||
msg := common.MessagePublication{
|
||||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(1),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: tokenBridgeAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
Payload: payloadBytes1,
|
||||
}
|
||||
|
||||
v := &vaa.VAA{
|
||||
Version: vaa.SupportedVAAVersion,
|
||||
GuardianSetIndex: 1,
|
||||
Signatures: nil,
|
||||
Timestamp: msg.Timestamp,
|
||||
Nonce: msg.Nonce,
|
||||
EmitterChain: msg.EmitterChain,
|
||||
EmitterAddress: msg.EmitterAddress,
|
||||
Payload: msg.Payload,
|
||||
Sequence: msg.Sequence,
|
||||
ConsistencyLevel: msg.ConsistencyLevel,
|
||||
}
|
||||
|
||||
gov.ProcessInboundQuorum(v)
|
||||
|
||||
numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
|
||||
assert.Equal(t, 1, numTrans)
|
||||
assert.Equal(t, uint64(2218), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 1, len(gov.msgsById))
|
||||
|
||||
canPost, err := gov.ProcessMsgForTime(&msg, time.Now())
|
||||
require.NoError(t, err)
|
||||
|
||||
numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
|
||||
assert.Equal(t, true, canPost)
|
||||
assert.Equal(t, 1, numTrans)
|
||||
assert.Equal(t, uint64(2218), valueTrans)
|
||||
assert.Equal(t, 0, numPending)
|
||||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 1, len(gov.msgsById))
|
||||
}
|
||||
|
||||
func TestDontReloadDuplicates(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
gov, err := newChainGovernorForTest(ctx)
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, gov)
|
||||
|
||||
emitterAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
|
||||
emitterAddr, err := vaa.StringToAddress(emitterAddrStr)
|
||||
require.NoError(t, err)
|
||||
|
||||
tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec
|
||||
tokenAddr, err := vaa.StringToAddress(tokenAddrStr)
|
||||
require.NoError(t, err)
|
||||
toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8"
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
gov.setDayLengthInMinutes(24 * 60)
|
||||
err = gov.setChainForTesting(vaa.ChainIDEthereum, emitterAddrStr, 1000000, 0)
|
||||
require.NoError(t, err)
|
||||
err = gov.setTokenForTesting(vaa.ChainIDEthereum, emitterAddrStr, "WETH", 1774.62)
|
||||
require.NoError(t, err)
|
||||
|
||||
now, _ := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 12:01pm (CST)")
|
||||
startTime := now.Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))
|
||||
|
||||
var xfers []*db.Transfer
|
||||
|
||||
xfer1 := &db.Transfer{
|
||||
Timestamp: startTime.Add(time.Minute * 5),
|
||||
Value: uint64(1000),
|
||||
OriginChain: vaa.ChainIDEthereum,
|
||||
OriginAddress: tokenAddr,
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: emitterAddr,
|
||||
MsgID: "2/" + emitterAddrStr + "/125",
|
||||
}
|
||||
xfers = append(xfers, xfer1)
|
||||
|
||||
xfer2 := &db.Transfer{
|
||||
Timestamp: startTime.Add(time.Minute * 5),
|
||||
Value: uint64(2000),
|
||||
OriginChain: vaa.ChainIDEthereum,
|
||||
OriginAddress: tokenAddr,
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: emitterAddr,
|
||||
MsgID: "2/" + emitterAddrStr + "/126",
|
||||
}
|
||||
xfers = append(xfers, xfer2)
|
||||
|
||||
// Add a duplicate of each transfer
|
||||
xfers = append(xfers, xfer1)
|
||||
xfers = append(xfers, xfer2)
|
||||
assert.Equal(t, 4, len(xfers))
|
||||
|
||||
payload1 := buildMockTransferPayloadBytes(1,
|
||||
vaa.ChainIDEthereum,
|
||||
tokenAddrStr,
|
||||
vaa.ChainIDPolygon,
|
||||
toAddrStr,
|
||||
1.25,
|
||||
)
|
||||
|
||||
var pendings []*db.PendingTransfer
|
||||
pending1 := &db.PendingTransfer{
|
||||
ReleaseTime: now.Add(time.Hour * 24),
|
||||
Msg: common.MessagePublication{
|
||||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(200),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: emitterAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
Payload: payload1,
|
||||
},
|
||||
}
|
||||
pendings = append(pendings, pending1)
|
||||
|
||||
pending2 := &db.PendingTransfer{
|
||||
ReleaseTime: now.Add(time.Hour * 24),
|
||||
Msg: common.MessagePublication{
|
||||
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
|
||||
Timestamp: time.Unix(int64(1654543099), 0),
|
||||
Nonce: uint32(1),
|
||||
Sequence: uint64(201),
|
||||
EmitterChain: vaa.ChainIDEthereum,
|
||||
EmitterAddress: emitterAddr,
|
||||
ConsistencyLevel: uint8(32),
|
||||
Payload: payload1,
|
||||
},
|
||||
}
|
||||
pendings = append(pendings, pending2)
|
||||
|
||||
// Add a duplicate of each pending transfer
|
||||
pendings = append(pendings, pending1)
|
||||
pendings = append(pendings, pending2)
|
||||
assert.Equal(t, 4, len(pendings))
|
||||
|
||||
for _, p := range xfers {
|
||||
gov.reloadTransfer(p, now, startTime)
|
||||
}
|
||||
|
||||
for _, p := range pendings {
|
||||
gov.reloadPendingTransfer(p, now)
|
||||
}
|
||||
|
||||
numTrans, valueTrans, numPending, valuePending := gov.getStatsForAllChains()
|
||||
assert.Equal(t, 2, numTrans)
|
||||
assert.Equal(t, uint64(3000), valueTrans)
|
||||
assert.Equal(t, 2, numPending)
|
||||
assert.Equal(t, uint64(4436), valuePending)
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
node_common "github.com/certusone/wormhole/node/pkg/common"
|
||||
"github.com/certusone/wormhole/node/pkg/db"
|
||||
"github.com/certusone/wormhole/node/pkg/governor"
|
||||
"github.com/mr-tron/base58"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
|
@ -218,7 +219,7 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
|
|||
}
|
||||
}
|
||||
|
||||
func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum) {
|
||||
func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum, gov *governor.ChainGovernor) {
|
||||
v, err := vaa.Unmarshal(m.Vaa)
|
||||
if err != nil {
|
||||
p.logger.Warn("received invalid VAA in SignedVAAWithQuorum message",
|
||||
|
@ -312,4 +313,8 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
|
|||
return
|
||||
}
|
||||
p.attestationEvents.ReportVAAQuorum(v)
|
||||
|
||||
if gov != nil {
|
||||
gov.ProcessInboundQuorum(v)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ func TestHandleInboundSignedVAAWithQuorum_NilGuardianSet(t *testing.T) {
|
|||
processor := Processor{}
|
||||
processor.logger = observedLogger
|
||||
|
||||
processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum)
|
||||
processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum, nil)
|
||||
|
||||
// Check to see if we got an error, which we should have,
|
||||
// because a `gs` is not defined on processor
|
||||
|
@ -114,7 +114,7 @@ func TestHandleInboundSignedVAAWithQuorum(t *testing.T) {
|
|||
processor.gs = &guardianSet
|
||||
processor.logger = observedLogger
|
||||
|
||||
processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum)
|
||||
processor.handleInboundSignedVAAWithQuorum(ctx, signedVAAWithQuorum, nil)
|
||||
|
||||
// Check to see if we got an error, which we should have
|
||||
assert.Equal(t, 1, observedLogs.Len())
|
||||
|
|
|
@ -207,7 +207,7 @@ func (p *Processor) Run(ctx context.Context) error {
|
|||
case m := <-p.obsvC:
|
||||
p.handleObservation(ctx, m)
|
||||
case m := <-p.signedInC:
|
||||
p.handleInboundSignedVAAWithQuorum(ctx, m)
|
||||
p.handleInboundSignedVAAWithQuorum(ctx, m, p.governor)
|
||||
case <-p.cleanup.C:
|
||||
p.handleCleanup(ctx)
|
||||
case <-govTimer.C:
|
||||
|
|
Loading…
Reference in New Issue