diff --git a/node/pkg/governor/governor_db.go b/node/pkg/governor/governor_db.go index 146f2c26f..a71fb194f 100644 --- a/node/pkg/governor/governor_db.go +++ b/node/pkg/governor/governor_db.go @@ -18,6 +18,9 @@ func (gov *ChainGovernor) loadFromDB() error { return gov.loadFromDBAlreadyLocked() } +// loadFromDBAlreadyLocked method loads transfers and pending data from the database and modifies the corresponding fields in the ChainGovernor. +// These fields are slices of transfers or pendingTransfers and will be sorted by their Timestamp property. +// Modifies the state of the database as a side-effect: 'transfers' that are older than 24 hours are deleted. func (gov *ChainGovernor) loadFromDBAlreadyLocked() error { xfers, pending, err := gov.db.GetChainGovernorData(gov.logger) if err != nil { @@ -154,10 +157,16 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) { zap.String("Hash", hash), ) + // Note: no flow cancel added here. We only want to add an inverse, flow-cancel transfer when the transfer is + // released from the pending queue, not when it's added. ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, hash: hash, dbData: *pending}) gov.msgsSeen[hash] = transferEnqueued } +// reloadTransfer method processes a db.Transfer and validates that it should be loaded into `gov`. +// Modifies `gov` as a side-effect: when a valid transfer is loaded, the properties 'transfers' and 'msgsSeen' are +// updated with information about the loaded transfer. In the case where a flow-canceling asset's transfer is loaded, +// both chain entries (emitter and target) will be updated. func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error { ce, exists := gov.chains[xfer.EmitterChain] if !exists { @@ -233,5 +242,27 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error { return err } ce.transfers = append(ce.transfers, transfer) + + // Reload flow-cancel transfers for the TargetChain. This is important when node restarts so that a corresponding, + // inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` loop but + // that function does not capture flow-cancelling when the node is restarted. + tokenEntry := gov.tokens[tk] + if tokenEntry != nil { + // Mandatory check to ensure that the token should be able to reduce the Governor limit. + if tokenEntry.flowCancels { + if destinationChainEntry, ok := gov.chains[xfer.TargetChain]; ok { + if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(xfer); err != nil { + return err + } + } else { + gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist", + zap.String("msgID", xfer.MsgID), + zap.Stringer("token chain", xfer.OriginChain), + zap.Stringer("token address", xfer.OriginAddress), + zap.Stringer("target chain", xfer.TargetChain), + ) + } + } + } return nil } diff --git a/node/pkg/governor/governor_test.go b/node/pkg/governor/governor_test.go index 7cec609b1..964ec5334 100644 --- a/node/pkg/governor/governor_test.go +++ b/node/pkg/governor/governor_test.go @@ -2337,6 +2337,203 @@ func TestDontReloadDuplicates(t *testing.T) { assert.Equal(t, uint64(4436), valuePending) } +// With the addition of the flow-cancel feature, it's possible to in a way "exceed the daily limit" of outflow from a +// Governor as long as there is corresponding inflow of a flow-canceling asset to allow for additional outflow. +// When the node is restarted, it reloads all transfers and pending transfers. If the actual outflow is greater than +// the daily limit (due to flow cancel) ensure that the calculated limit on start-up is correct. +// This test ensures that governor usage limits are correctly calculated when reloading transfers from the database. +func TestReloadTransfersNearCapacity(t *testing.T) { + // Setup + ctx := context.Background() + gov, err := newChainGovernorForTest(ctx) + + require.NoError(t, err) + assert.NotNil(t, gov) + + // Set-up time + gov.setDayLengthInMinutes(24 * 60) + transferTime := time.Now() + + // Solana USDC used as the flow cancelling asset. This ensures that the flow cancel mechanism works + // when the Origin chain of the asset does not match the emitter chain + // NOTE: Replace this Chain:Address pair if the Flow Cancel Token List is modified + var flowCancelTokenOriginAddress vaa.Address + flowCancelTokenOriginAddress, err = vaa.StringToAddress("c6fa7af3bedbad3a3d65f36aabc97431b1bbe4c2d2f6e0e47ca60203452f5d61") + require.NoError(t, err) + + var notFlowCancelTokenOriginAddress vaa.Address + notFlowCancelTokenOriginAddress, err = vaa.StringToAddress("77777af3bedbad3a3d65f36aabc97431b1bbe4c2d2f6e0e47ca60203452f7777") + require.NoError(t, err) + + // Data for Ethereum + tokenBridgeAddrStrEthereum := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec + tokenBridgeAddrEthereum, err := vaa.StringToAddress(tokenBridgeAddrStrEthereum) + require.NoError(t, err) + + // Data for Sui + tokenBridgeAddrStrSui := "0xc57508ee0d4595e5a8728974a4a93a787d38f339757230d441e895422c07aba9" //nolint:gosec + tokenBridgeAddrSui, err := vaa.StringToAddress(tokenBridgeAddrStrSui) + require.NoError(t, err) + + // Data for Solana. Only used to represent the flow cancel asset. + // "wormDTUJ6AWPNvk59vGQbDvGJmqbDTdgWgAqcLBCgUb" + tokenBridgeAddrStrSolana := "0x0e0a589e6488147a94dcfa592b90fdd41152bb2ca77bf6016758a6f4df9d21b4" //nolint:gosec + + // Add chain entries to `gov` + err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStrEthereum, 10000, 50000) + require.NoError(t, err) + err = gov.setChainForTesting(vaa.ChainIDSui, tokenBridgeAddrStrSui, 10000, 0) + require.NoError(t, err) + err = gov.setChainForTesting(vaa.ChainIDSolana, tokenBridgeAddrStrSolana, 10000, 0) + require.NoError(t, err) + + // Add flow cancel asset and non-flow cancelable asset to the token entry for `gov` + err = gov.setTokenForTesting(vaa.ChainIDSolana, flowCancelTokenOriginAddress.String(), "USDC", 1.0, true) + require.NoError(t, err) + assert.NotNil(t, gov.tokens[tokenKey{chain: vaa.ChainIDSolana, addr: flowCancelTokenOriginAddress}]) + err = gov.setTokenForTesting(vaa.ChainIDEthereum, notFlowCancelTokenOriginAddress.String(), "NOTCANCELABLE", 1.0, false) + require.NoError(t, err) + + // This transfer should exhaust the dailyLimit for the emitter chain + xfer1 := &db.Transfer{ + Timestamp: transferTime.Add(-10), + Value: uint64(10000), + OriginChain: vaa.ChainIDSolana, + OriginAddress: flowCancelTokenOriginAddress, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddrEthereum, + TargetAddress: tokenBridgeAddrSui, + TargetChain: vaa.ChainIDSui, + MsgID: "2/" + tokenBridgeAddrEthereum.String() + "/125", + Hash: "Hash1", + } + + // This incoming transfer should free up some of the space on the previous emitter chain + xfer2 := &db.Transfer{ + Timestamp: transferTime.Add(-9), + Value: uint64(2000), + OriginChain: vaa.ChainIDSolana, + OriginAddress: flowCancelTokenOriginAddress, + EmitterChain: vaa.ChainIDSui, + EmitterAddress: tokenBridgeAddrSui, + TargetAddress: tokenBridgeAddrEthereum, + TargetChain: vaa.ChainIDEthereum, + MsgID: "2/" + tokenBridgeAddrSui.String() + "/126", + Hash: "Hash2", + } + + // Send another transfer out from the original emitter chain so that we "exceed the daily limit" if flow + // cancel is not applied + xfer3 := &db.Transfer{ + Timestamp: transferTime.Add(-8), + Value: uint64(50), + OriginChain: vaa.ChainIDSolana, + OriginAddress: flowCancelTokenOriginAddress, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddrEthereum, + TargetAddress: tokenBridgeAddrSui, + TargetChain: vaa.ChainIDSui, + MsgID: "2/" + tokenBridgeAddrEthereum.String() + "/125", + Hash: "Hash3", + } + + // Simulate reloading from the database. + // NOTE: The actual execution path we want to test is the following and runs when the node is restarted: + // gov.Run () --> gov.loadFromDb() --> gov.loadFromDBAlreadyLocked() --> gov.reloadTransfer() + // We don't have access to Run() from the test suite and the other functions are mocked to return `nil`. + // Therefore, the remainder of this test proceeds by operating on a list of `transfersLoadedFromDb` which + // simulates loading transfers from the database. + // From here we proceed with the next function we can actually test: `reloadTransfer()`. + + // STEP 0: Initial state + assert.Equal(t, len(gov.msgsSeen), 0) + numTrans, netValueTransferred, numPending, valuePending := gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 0, numTrans) + assert.Equal(t, int64(0), netValueTransferred) + assert.Equal(t, 0, numPending) + assert.Equal(t, uint64(0), valuePending) + + chainEntryEth, exists := gov.chains[vaa.ChainIDEthereum] + require.True(t, exists) + chainEntrySui, exists := gov.chains[vaa.ChainIDSui] + require.True(t, exists) + + // STEP 1: Load first transfer + err = gov.reloadTransfer(xfer1) + require.NoError(t, err) + assert.Equal(t, len(gov.msgsSeen), 1) + numTrans, netValueTransferred, _, _ = gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 2, numTrans) // 1 plus transfer the inverse flow transfer on the TargetChain + assert.Equal(t, int64(0), netValueTransferred) // Value cancels out for all transfers + + // Sum of absolute value of all transfers, including inverse flow cancel transfers: + // 2 * (10_000) = 20_000 + _, valueTransferred, _, _ := gov.getStatsForAllChains() + assert.Equal(t, uint64(20000), valueTransferred) + + governorUsageEth, err := gov.TrimAndSumValueForChain(chainEntryEth, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, uint64(10000), governorUsageEth) + assert.Zero(t, governorUsageEth-chainEntryEth.dailyLimit) // Make sure we used the whole capacity + require.NoError(t, err) + governorUsageSui, err := gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Zero(t, governorUsageSui) + require.NoError(t, err) + sumTransfersSui, _, err := gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(-10000), sumTransfersSui) + require.NoError(t, err) + + // STEP 2: Load second transfer + err = gov.reloadTransfer(xfer2) + require.NoError(t, err) + assert.Equal(t, len(gov.msgsSeen), 2) + numTrans, netValueTransferred, _, _ = gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 4, numTrans) // 2 transfers and their inverse flow transfers on the TargetChain + assert.Equal(t, int64(0), netValueTransferred) // Value cancels out for all transfers + + // Sum of absolute value of all transfers, including inverse flow cancel transfers: + // 2 * (10_000 + 2_000) = 24_000 + _, valueTransferred, _, _ = gov.getStatsForAllChains() + assert.Equal(t, uint64(24000), valueTransferred) + + governorUsageEth, err = gov.TrimAndSumValueForChain(chainEntryEth, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, uint64(8000), governorUsageEth) + assert.Equal(t, int(chainEntryEth.dailyLimit-governorUsageEth), 2000) // Remaining capacity + require.NoError(t, err) + governorUsageSui, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Zero(t, governorUsageSui) + require.NoError(t, err) + sumTransfersSui, _, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(-8000), sumTransfersSui) + require.NoError(t, err) + + // STEP 3: Load third transfer + err = gov.reloadTransfer(xfer3) + require.NoError(t, err) + // Sum of absolute value of all transfers, including inverse flow cancel transfers: + // 2 * (10_000 + 2_000 + 50) = 24_100 + _, valueTransferred, _, _ = gov.getStatsForAllChains() + assert.Equal(t, uint64(24100), valueTransferred) + + numTrans, netValueTransferred, numPending, valuePending = gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 6, numTrans) // 3 transfers and their inverse flow transfers on the TargetChain + assert.Equal(t, int64(0), netValueTransferred) // Value cancels out for all transfers + + governorUsageEth, err = gov.TrimAndSumValueForChain(chainEntryEth, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, uint64(8050), governorUsageEth) + assert.Equal(t, int(chainEntryEth.dailyLimit-governorUsageEth), 1950) // Remaining capacity + require.NoError(t, err) + governorUsageSui, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + require.NoError(t, err) + assert.Zero(t, governorUsageSui) + sumTransfersSui, _, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(-8050), sumTransfersSui) + require.NoError(t, err) + + // Sanity check: make sure these are still empty/zero + assert.Equal(t, 0, numPending) + assert.Equal(t, uint64(0), valuePending) +} + func TestReobservationOfPublishedMsg(t *testing.T) { ctx := context.Background() gov, err := newChainGovernorForTest(ctx)