Node/Acct: audit stuck pending

Change-Id: I643ce699b5d5e21129d957dae8677e8f2fccdbfd
This commit is contained in:
Bruce Riley 2023-01-24 21:03:18 +00:00 committed by Evan Gray
parent 2e400bef1c
commit 04f5415c84
11 changed files with 484 additions and 94 deletions

View File

@ -1019,6 +1019,7 @@ func runNode(cmd *cobra.Command, args []string) {
rootCtx,
logger,
db,
obsvReqWriteC,
*accountantContract,
*accountantWS,
wormchainConn,

View File

@ -15,6 +15,7 @@ import (
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/db"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/wormconn"
"github.com/wormhole-foundation/wormhole/sdk"
@ -31,10 +32,6 @@ const (
TestNetMode = 2
DevNetMode = 3
GoTestMode = 4
// We will retry requests once per minute for up to an hour.
auditInterval = time.Duration(time.Minute)
maxRetries = 60
)
type (
@ -50,11 +47,14 @@ type (
// pendingEntry is the payload for each pending transfer
pendingEntry struct {
msg *common.MessagePublication
msgId string
digest string
updTime time.Time
retryCount int
msg *common.MessagePublication
msgId string
digest string
updTime time.Time
// submitPending indicates if the observation is either in the channel waiting to be submitted or in an outstanding transaction.
// The audit should not resubmit anything where submitPending is set to true.
submitPending bool
}
)
@ -63,6 +63,7 @@ type Accountant struct {
ctx context.Context
logger *zap.Logger
db db.AccountantDB
obsvReqWriteC chan<- *gossipv1.ObservationRequest
contract string
wsUrl string
wormchainConn *wormconn.ClientConn
@ -75,6 +76,7 @@ type Accountant struct {
pendingTransfersLock sync.Mutex
pendingTransfers map[string]*pendingEntry // Key is the message ID (emitterChain/emitterAddr/seqNo)
subChan chan *common.MessagePublication
lastAuditTime time.Time
env int
}
@ -85,6 +87,7 @@ func NewAccountant(
ctx context.Context,
logger *zap.Logger,
db db.AccountantDB,
obsvReqWriteC chan<- *gossipv1.ObservationRequest,
contract string, // the address of the smart contract on wormchain
wsUrl string, // the URL of the wormchain websocket interface
wormchainConn *wormconn.ClientConn, // used for communicating with the smart contract
@ -98,6 +101,7 @@ func NewAccountant(
ctx: ctx,
logger: logger,
db: db,
obsvReqWriteC: obsvReqWriteC,
contract: contract,
wsUrl: wsUrl,
wormchainConn: wormchainConn,
@ -115,7 +119,7 @@ func NewAccountant(
// Run initializes the accountant and starts the watcher runnable.
func (acct *Accountant) Start(ctx context.Context) error {
acct.logger.Debug("acct: entering run")
acct.logger.Debug("acct: entering Start", zap.Bool("enforceFlag", acct.enforceFlag))
acct.pendingTransfersLock.Lock()
defer acct.pendingTransfersLock.Unlock()
@ -151,11 +155,11 @@ func (acct *Accountant) Start(ctx context.Context) error {
// Start the watcher to listen to transfer events from the smart contract.
if acct.env != GoTestMode {
if err := supervisor.Run(ctx, "acctworker", acct.worker); err != nil {
if err := supervisor.Run(ctx, "acctworker", common.WrapWithScissors(acct.worker, "acctworker")); err != nil {
return fmt.Errorf("failed to start submit observation worker: %w", err)
}
if err := supervisor.Run(ctx, "acctwatcher", acct.watcher); err != nil {
if err := supervisor.Run(ctx, "acctwatcher", common.WrapWithScissors(acct.watcher, "acctwatcher")); err != nil {
return fmt.Errorf("failed to start watcher: %w", err)
}
}
@ -220,7 +224,8 @@ func (acct *Accountant) SubmitObservation(msg *common.MessagePublication) (bool,
}
// Add it to the pending map and the database.
if err := acct.addPendingTransferAlreadyLocked(msgId, msg, digest); err != nil {
pe := &pendingEntry{msg: msg, msgId: msgId, digest: digest, updTime: time.Now()}
if err := acct.addPendingTransferAlreadyLocked(pe); err != nil {
acct.logger.Error("acct: failed to persist pending transfer, blocking publishing", zap.String("msgID", msgId), zap.Error(err))
return false, err
}
@ -228,50 +233,13 @@ func (acct *Accountant) SubmitObservation(msg *common.MessagePublication) (bool,
// This transaction may take a while. Pass it off to the worker so we don't block the processor.
if acct.env != GoTestMode {
acct.logger.Info("acct: submitting transfer to accountant for approval", zap.String("msgID", msgId), zap.Bool("canPublish", !acct.enforceFlag))
acct.submitObservation(msg)
acct.submitObservation(pe)
}
// If we are not enforcing accountant, the event can be published. Otherwise we have to wait to hear back from the contract.
return !acct.enforceFlag, nil
}
// AuditPending audits the set of pending transfers for any that have been in the pending state too long. This is called from the processor loop
// each timer interval. Any transfers that have been in the pending state too long will be resubmitted. Any that has been retried too many times
// will be logged and dropped.
func (acct *Accountant) AuditPendingTransfers() {
acct.logger.Debug("acct: in AuditPendingTransfers")
acct.pendingTransfersLock.Lock()
defer acct.pendingTransfersLock.Unlock()
if len(acct.pendingTransfers) == 0 {
acct.logger.Debug("acct: leaving AuditPendingTransfers, no pending transfers")
return
}
for msgId, pe := range acct.pendingTransfers {
acct.logger.Debug("acct: evaluating pending transfer", zap.String("msgID", msgId), zap.Stringer("updTime", pe.updTime))
if time.Since(pe.updTime) > auditInterval {
pe.retryCount += 1
if pe.retryCount > maxRetries {
acct.logger.Error("acct: stuck pending transfer has reached the retry limit, dropping it", zap.String("msgId", msgId))
acct.deletePendingTransferAlreadyLocked(msgId)
continue
}
acct.logger.Error("acct: resubmitting pending transfer",
zap.String("msgId", msgId),
zap.Stringer("lastUpdateTime", pe.updTime),
zap.Int("retryCount", pe.retryCount),
)
pe.updTime = time.Now()
acct.submitObservation(pe.msg)
}
}
acct.logger.Debug("acct: leaving AuditPendingTransfers")
}
// publishTransferAlreadyLocked publishes a pending transfer to the accountant channel and updates the timestamp. It assumes the caller holds the lock.
func (acct *Accountant) publishTransferAlreadyLocked(pe *pendingEntry) {
if acct.enforceFlag {
@ -283,15 +251,14 @@ func (acct *Accountant) publishTransferAlreadyLocked(pe *pendingEntry) {
}
// addPendingTransferAlreadyLocked adds a pending transfer to both the map and the database. It assumes the caller holds the lock.
func (acct *Accountant) addPendingTransferAlreadyLocked(msgId string, msg *common.MessagePublication, digest string) error {
acct.logger.Debug("acct: addPendingTransferAlreadyLocked", zap.String("msgId", msgId))
if err := acct.db.AcctStorePendingTransfer(msg); err != nil {
func (acct *Accountant) addPendingTransferAlreadyLocked(pe *pendingEntry) error {
acct.logger.Debug("acct: addPendingTransferAlreadyLocked", zap.String("msgId", pe.msgId))
if err := acct.db.AcctStorePendingTransfer(pe.msg); err != nil {
return err
}
pe := &pendingEntry{msg: msg, msgId: msgId, digest: digest, updTime: time.Now()}
acct.pendingTransfers[msgId] = pe
transfersOutstanding.Inc()
acct.pendingTransfers[pe.msgId] = pe
transfersOutstanding.Set(float64(len(acct.pendingTransfers)))
return nil
}
@ -306,7 +273,7 @@ func (acct *Accountant) deletePendingTransfer(msgId string) {
func (acct *Accountant) deletePendingTransferAlreadyLocked(msgId string) {
acct.logger.Debug("acct: deletePendingTransfer", zap.String("msgId", msgId))
if _, exists := acct.pendingTransfers[msgId]; exists {
transfersOutstanding.Dec()
transfersOutstanding.Set(float64(len(acct.pendingTransfers)))
delete(acct.pendingTransfers, msgId)
}
if err := acct.db.AcctDeletePendingTransfer(msgId); err != nil {
@ -329,9 +296,9 @@ func (acct *Accountant) loadPendingTransfers() error {
digest := msg.CreateDigest()
pe := &pendingEntry{msg: msg, msgId: msgId, digest: digest} // Leave the updTime unset so we will query this on the first audit interval.
acct.pendingTransfers[msgId] = pe
transfersOutstanding.Inc()
}
transfersOutstanding.Set(float64(len(acct.pendingTransfers)))
if len(acct.pendingTransfers) != 0 {
acct.logger.Info("acct: reloaded pending transfers", zap.Int("total", len(acct.pendingTransfers)))
} else {
@ -344,18 +311,25 @@ func (acct *Accountant) loadPendingTransfers() error {
// submitObservation sends an observation request to the worker so it can be submited to the contract.
// If writing to the channel would block, this function resets the timestamp on the entry so it will be
// retried next audit interval. This method assumes the caller holds the lock.
func (acct *Accountant) submitObservation(msg *common.MessagePublication) {
func (acct *Accountant) submitObservation(pe *pendingEntry) {
pe.submitPending = true
select {
case acct.subChan <- msg:
acct.logger.Debug("acct: submitted observation to channel", zap.String("msgId", msg.MessageIDString()))
case acct.subChan <- pe.msg:
acct.logger.Debug("acct: submitted observation to channel", zap.String("msgId", pe.msgId))
default:
msgId := msg.MessageIDString()
acct.logger.Error("acct: unable to submit observation because the channel is full, will try next interval", zap.String("msgId", msgId))
pe, exists := acct.pendingTransfers[msgId]
if exists {
pe.updTime = time.Time{}
} else {
acct.logger.Error("acct: failed to look up pending transfer", zap.String("msgId", msgId))
acct.logger.Error("acct: unable to submit observation because the channel is full, will try next interval", zap.String("msgId", pe.msgId))
pe.submitPending = false
pe.updTime = time.Time{}
}
}
// clearSubmitPendingFlags is called after a batch is finished being submitted (success or fail). It clears the submit pending flag for everything in the batch.
func (acct *Accountant) clearSubmitPendingFlags(msgs []*common.MessagePublication) {
acct.pendingTransfersLock.Lock()
defer acct.pendingTransfersLock.Unlock()
for _, msg := range msgs {
if pe, exists := acct.pendingTransfers[msg.MessageIDString()]; exists {
pe.submitPending = false
}
}
}

View File

@ -16,6 +16,7 @@ import (
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/devnet"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
@ -29,6 +30,7 @@ func newAccountantForTest(
t *testing.T,
ctx context.Context,
accountantCheckEnabled bool,
obsvReqWriteC chan<- *gossipv1.ObservationRequest,
acctWriteC chan<- *common.MessagePublication,
) *Accountant {
logger := zap.NewNop()
@ -44,6 +46,7 @@ func newAccountantForTest(
ctx,
logger,
&db,
obsvReqWriteC,
"0xdeadbeef", // accountantContract
"none", // accountantWS
nil, // wormchainConn
@ -100,8 +103,9 @@ func buildMockTransferPayloadBytes(
func TestVaaFromUninterestingEmitter(t *testing.T) {
ctx := context.Background()
obsvReqWriteC := make(chan *gossipv1.ObservationRequest, 10)
acctChan := make(chan *common.MessagePublication, 10)
acct := newAccountantForTest(t, ctx, enforceAccountant, acctChan)
acct := newAccountantForTest(t, ctx, enforceAccountant, obsvReqWriteC, acctChan)
require.NotNil(t, acct)
emitterAddr, _ := vaa.StringToAddress("0x00")
@ -126,8 +130,9 @@ func TestVaaFromUninterestingEmitter(t *testing.T) {
func TestVaaForUninterestingPayloadType(t *testing.T) {
ctx := context.Background()
obsvReqWriteC := make(chan *gossipv1.ObservationRequest, 10)
acctChan := make(chan *common.MessagePublication, 10)
acct := newAccountantForTest(t, ctx, enforceAccountant, acctChan)
acct := newAccountantForTest(t, ctx, enforceAccountant, obsvReqWriteC, acctChan)
require.NotNil(t, acct)
emitterAddr, _ := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
@ -152,8 +157,9 @@ func TestVaaForUninterestingPayloadType(t *testing.T) {
func TestInterestingTransferShouldNotBeBlockedWhenNotEnforcingAccountant(t *testing.T) {
ctx := context.Background()
obsvReqWriteC := make(chan *gossipv1.ObservationRequest, 10)
acctChan := make(chan *common.MessagePublication, 10)
acct := newAccountantForTest(t, ctx, dontEnforceAccountant, acctChan)
acct := newAccountantForTest(t, ctx, dontEnforceAccountant, obsvReqWriteC, acctChan)
require.NotNil(t, acct)
emitterAddr, _ := vaa.StringToAddress("0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16")
@ -194,8 +200,9 @@ func TestInterestingTransferShouldNotBeBlockedWhenNotEnforcingAccountant(t *test
func TestInterestingTransferShouldBeBlockedWhenEnforcingAccountant(t *testing.T) {
ctx := context.Background()
obsvReqWriteC := make(chan *gossipv1.ObservationRequest, 10)
acctChan := make(chan *common.MessagePublication, 10)
acct := newAccountantForTest(t, ctx, enforceAccountant, acctChan)
acct := newAccountantForTest(t, ctx, enforceAccountant, obsvReqWriteC, acctChan)
require.NotNil(t, acct)
emitterAddr, _ := vaa.StringToAddress("0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16")

View File

@ -0,0 +1,292 @@
// This code audits the set of pending transfers against the state reported by the smart contract. It is called from the processor every minute,
// but the audit is performed less frequently. The audit occurs in two phases that operate off of a temporary map of all pending transfers known to this guardian.
//
// The first phase involves querying the smart contract for any observations that it thinks are missing for this guardian. The audit processes everything in the
// returned results and does one of the following:
// - If the observation is in our temporary map, we resubmit an observation to the contract and delete it from our temporary map.
// - If the observation is not in the temporary map, we request a reobservation from the local watcher.
//
// The second phase consists of requesting the status from the contract for everything that is still in the temporary map. For each returned item, we do the following:
// - If the contract indicates that the transfer has been committed, we validate the digest, then publish it and delete it from the map.
// - If the contract indicates that the transfer is pending, we continue to wait for it.
// - If the contract indicates any other status (most likely meaning it does not know about it), we resubmit an observation to the contract
//
// Note that any time we are considering resubmitting an observation to the contract, we first check the "submit pending" flag. If that is set, we do not
// submit the observation to the contract, but continue to wait for it to work its way through the queue.
package accountant
import (
"encoding/hex"
"encoding/json"
"fmt"
"time"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
cosmossdk "github.com/cosmos/cosmos-sdk/types"
ethCommon "github.com/ethereum/go-ethereum/common"
"go.uber.org/zap"
)
const (
// auditInterval indicates how often the audit runs (given that it is invoked by the processor once per minute)
auditInterval = 30 * time.Second // TODO: Make this something like five minutes?
// maxSubmitPendingTime indicates how long a transfer can be in the pending state before the audit starts complaining about it.
maxSubmitPendingTime = 10 * time.Minute
)
type (
// MissingObservationsResponse is the result returned from the "missing_observations" query.
MissingObservationsResponse struct {
Missing []MissingObservation
}
MissingObservation struct {
ChainId uint16 `json:"chain_id"`
TxHash []byte `json:"tx_hash"`
}
// BatchTransferStatusResponse is the result returned by the "batch_transfer_status" query.
BatchTransferStatusResponse struct {
Details []TransferDetails `json:"details"`
}
TransferDetails struct {
Key TransferKey
Status TransferStatus
}
TransferStatus struct {
Committed *TransferStatusCommitted `json:"committed"`
Pending *TransferStatusPending `json:"pending"`
}
TransferStatusCommitted struct {
Data TransferData `json:"data"`
Digest []byte `json:"digest"`
}
TransferData struct {
Amount *cosmossdk.Int `json:"amount"`
TokenChain uint16 `json:"token_chain"`
TokenAddress vaa.Address `json:"token_address"`
RecipientChain uint16 `json:"recipient_chain"`
}
TransferStatusPending struct {
}
)
// makeAuditKey creates an audit map key from a missing observation.
func (mo *MissingObservation) makeAuditKey() string {
return fmt.Sprintf("%d-%s", mo.ChainId, hex.EncodeToString(mo.TxHash[:]))
}
// makeAuditKey creates an audit map key from a pending observation entry.
func (pe *pendingEntry) makeAuditKey() string {
return fmt.Sprintf("%d-%s", pe.msg.EmitterChain, pe.msg.TxHash.String())
}
// AuditPendingTransfers is the entry point for the audit of the pending transfer map. It determines if it has been long enough since the last audit.
// If so, it creates a temporary map of all pending transfers and invokes the main audit function as a go routine.
func (acct *Accountant) AuditPendingTransfers() {
acct.pendingTransfersLock.Lock()
defer acct.pendingTransfersLock.Unlock()
if time.Since(acct.lastAuditTime) < auditInterval {
acct.logger.Debug("acctaudit: in AuditPendingTransfers, not time to run yet", zap.Stringer("lastAuditTime", acct.lastAuditTime))
return
}
tmpMap := make(map[string]*pendingEntry)
for _, pe := range acct.pendingTransfers {
if (pe.submitPending) && (time.Since(pe.updTime) > maxSubmitPendingTime) {
auditErrors.Inc()
acct.logger.Error("acctaudit: transfer has been in the submit pending state for too long", zap.Stringer("lastUpdateTime", pe.updTime))
}
acct.logger.Debug("acctaudit: will audit pending transfer", zap.String("msgId", pe.msgId), zap.Stringer("lastUpdateTime", pe.updTime))
tmpMap[pe.makeAuditKey()] = pe
}
acct.logger.Debug("acctaudit: in AuditPendingTransfers: starting audit", zap.Int("numPending", len(tmpMap)))
acct.lastAuditTime = time.Now()
go acct.performAudit(tmpMap)
acct.logger.Debug("acctaudit: leaving AuditPendingTransfers")
}
// performAudit audits the temporary map against the smart contract. It is meant to be run in a go routine. It takes a temporary map of all pending transfers
// and validates that against what is reported by the smart contract. For more details, please see the prologue of this file.
func (acct *Accountant) performAudit(tmpMap map[string]*pendingEntry) {
acct.logger.Debug("acctaudit: entering performAudit")
missingObservations, err := acct.queryMissingObservations()
if err != nil {
acct.logger.Error("acctaudit: unable to perform audit, failed to query missing observations", zap.Error(err))
for _, pe := range tmpMap {
acct.logger.Error("acctaudit: unsure of status of pending transfer due to query error", zap.String("msgId", pe.msgId))
}
return
}
if len(missingObservations) != 0 {
for _, mo := range missingObservations {
key := mo.makeAuditKey()
pe, exists := tmpMap[key]
if exists {
if !pe.submitPending {
auditErrors.Inc()
acct.logger.Error("acctaudit: contract reported pending observation as missing, resubmitting it", zap.String("msgID", pe.msgId))
acct.submitObservation(pe)
} else {
acct.logger.Info("acctaudit: contract reported pending observation as missing but it is queued up to be submitted, skipping it", zap.String("msgID", pe.msgId))
}
delete(tmpMap, key)
} else {
acct.handleMissingObservation(mo)
}
}
}
if len(tmpMap) != 0 {
var keys []TransferKey
var pendingTransfers []*pendingEntry
for _, pe := range tmpMap {
keys = append(keys, TransferKey{EmitterChain: uint16(pe.msg.EmitterChain), EmitterAddress: pe.msg.EmitterAddress, Sequence: pe.msg.Sequence})
pendingTransfers = append(pendingTransfers, pe)
}
transferDetails, err := acct.queryBatchTransferStatus(keys)
if err != nil {
acct.logger.Error("acctaudit: unable to finish audit, failed to query for transfer statuses", zap.Error(err))
for _, pe := range tmpMap {
acct.logger.Error("acctaudit: unsure of status of pending transfer due to query error", zap.String("msgId", pe.msgId))
}
return
}
for _, pe := range pendingTransfers {
item, exists := transferDetails[pe.msgId]
if !exists {
if !pe.submitPending {
auditErrors.Inc()
acct.logger.Error("acctaudit: query did not return status for transfer, this should not happen, resubmitting it", zap.String("msgId", pe.msgId))
acct.submitObservation(pe)
} else {
acct.logger.Debug("acctaudit: query did not return status for transfer we have not submitted yet, ignoring it", zap.String("msgId", pe.msgId))
}
continue
}
if item.Status.Committed != nil {
digest := hex.EncodeToString(item.Status.Committed.Digest)
if pe.digest == digest {
acct.logger.Info("acctaudit: audit determined that transfer has been committed, publishing it", zap.String("msgId", pe.msgId))
acct.handleCommittedTransfer(pe.msgId)
} else {
digestMismatches.Inc()
acct.logger.Error("acctaudit: audit detected a digest mismatch, dropping transfer", zap.String("msgId", pe.msgId), zap.String("ourDigest", pe.digest), zap.String("reportedDigest", digest))
acct.deletePendingTransfer(pe.msgId)
}
} else if item.Status.Pending != nil {
acct.logger.Debug("acctaudit: contract says transfer is still pending", zap.String("msgId", pe.msgId))
} else if !pe.submitPending {
auditErrors.Inc()
acct.logger.Error("acctaudit: contract does not know about pending transfer, resubmitting it", zap.String("msgId", pe.msgId))
acct.submitObservation(pe)
}
}
}
acct.logger.Debug("acctaudit: exiting performAudit")
}
// handleMissingObservation submits a reobservation request if appropriate.
func (acct *Accountant) handleMissingObservation(mo MissingObservation) {
// It's possible we received this transfer after we built the temporary map. If so, we don't want to do a reobservation.
if acct.transferNowExists(mo) {
acct.logger.Debug("acctaudit: contract reported unknown observation as missing but it is now in our pending map, ignoring it", zap.Uint16("chainId", mo.ChainId), zap.String("txHash", hex.EncodeToString(mo.TxHash)))
return
}
acct.logger.Debug("acctaudit: contract reported unknown observation as missing, requesting reobservation", zap.Uint16("chainId", mo.ChainId), zap.String("txHash", hex.EncodeToString(mo.TxHash)))
msg := &gossipv1.ObservationRequest{ChainId: uint32(mo.ChainId), TxHash: mo.TxHash}
acct.obsvReqWriteC <- msg
}
// transferNowExists checks to see if a missed observation exists in the pending transfer map. It grabs the lock.
func (acct *Accountant) transferNowExists(mo MissingObservation) bool {
acct.pendingTransfersLock.Lock()
defer acct.pendingTransfersLock.Unlock()
chanId := vaa.ChainID(mo.ChainId)
txHash := ethCommon.BytesToHash(mo.TxHash)
for _, pe := range acct.pendingTransfers {
if (pe.msg.EmitterChain == chanId) && (pe.msg.TxHash == txHash) {
return true
}
}
return false
}
// queryMissingObservations queries the contract for the set of observations it thinks are missing for this guardian.
func (acct *Accountant) queryMissingObservations() ([]MissingObservation, error) {
gs := acct.gst.Get()
if gs == nil {
return nil, fmt.Errorf("failed to get guardian set")
}
guardianIndex, found := gs.KeyIndex(acct.guardianAddr)
if !found {
return nil, fmt.Errorf("failed to get guardian index")
}
query := fmt.Sprintf(`{"missing_observations":{"guardian_set": %d, "index": %d}}`, gs.Index, guardianIndex)
acct.logger.Debug("acctaudit: submitting missing_observations query", zap.String("query", query))
resp, err := acct.wormchainConn.SubmitQuery(acct.ctx, acct.contract, []byte(query))
if err != nil {
return nil, fmt.Errorf("missing_observations query failed: %w", err)
}
var ret MissingObservationsResponse
if err := json.Unmarshal(resp.Data, &ret); err != nil {
return nil, fmt.Errorf("failed to parse missing_observations response: %w", err)
}
acct.logger.Debug("acctaudit: missing_observations query response", zap.Int("numEntries", len(ret.Missing)), zap.String("result", string(resp.Data)))
return ret.Missing, nil
}
// queryBatchTransferStatus queries the status of the specified transfers and returns a map keyed by transfer key (as a string) to the status.
func (acct *Accountant) queryBatchTransferStatus(keys []TransferKey) (map[string]TransferDetails, error) {
bytes, err := json.Marshal(keys)
if err != nil {
return nil, fmt.Errorf("failed to marshal keys: %w", err)
}
query := fmt.Sprintf(`{"batch_transfer_status":%s}`, string(bytes))
acct.logger.Debug("acctaudit: submitting batch_transfer_status query", zap.String("query", query))
resp, err := acct.wormchainConn.SubmitQuery(acct.ctx, acct.contract, []byte(query))
if err != nil {
return nil, fmt.Errorf("batch_transfer_status query failed: %w", err)
}
var response BatchTransferStatusResponse
if err := json.Unmarshal(resp.Data, &response); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
ret := make(map[string]TransferDetails)
for _, item := range response.Details {
ret[item.Key.String()] = item
}
acct.logger.Debug("acctaudit: batch_transfer_status query response", zap.Int("numEntries", len(ret)), zap.String("result", string(resp.Data)))
return ret, nil
}

View File

@ -52,4 +52,9 @@ var (
Name: "global_accountant_connection_errors_total",
Help: "Total number of connection errors on accountant",
})
auditErrors = promauto.NewCounter(
prometheus.CounterOpts{
Name: "global_accountant_audit_errors_total",
Help: "Total number of audit errors detected by accountant",
})
)

View File

@ -0,0 +1,74 @@
package accountant
import (
"encoding/hex"
"encoding/json"
"reflect"
"testing"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
cosmossdk "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestParseMissingObservationsResponse(t *testing.T) {
//TODO: Write this test once we get a sample response.
}
func TestParseBatchTransferStatusResponse(t *testing.T) {
responsesJson := []byte("{\"details\":[{\"key\":{\"emitter_chain\":2,\"emitter_address\":\"0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16\",\"sequence\":1674568234},\"status\":{\"committed\":{\"data\":{\"amount\":\"1000000000000000000\",\"token_chain\":2,\"token_address\":\"0000000000000000000000002d8be6bf0baa74e0a907016679cae9190e80dd0a\",\"recipient_chain\":4},\"digest\":\"1nbbff/7/ai9GJUs4h2JymFuO4+XcasC6t05glXc99M=\"}}},{\"key\":{\"emitter_chain\":2,\"emitter_address\":\"0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16\",\"sequence\":1674484597},\"status\":null}]}")
var response BatchTransferStatusResponse
err := json.Unmarshal(responsesJson, &response)
require.NoError(t, err)
require.Equal(t, 2, len(response.Details))
expectedEmitterAddress, err := vaa.StringToAddress("0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16")
require.NoError(t, err)
expectedTokenAddress, err := vaa.StringToAddress("0000000000000000000000002d8be6bf0baa74e0a907016679cae9190e80dd0a")
require.NoError(t, err)
expectedAmount0 := cosmossdk.NewInt(1000000000000000000)
expectedDigest0, err := hex.DecodeString("d676db7dfffbfda8bd18952ce21d89ca616e3b8f9771ab02eadd398255dcf7d3")
require.NoError(t, err)
expectedResult0 := TransferDetails{
Key: TransferKey{
EmitterChain: uint16(vaa.ChainIDEthereum),
EmitterAddress: expectedEmitterAddress,
Sequence: 1674568234,
},
Status: TransferStatus{
Committed: &TransferStatusCommitted{
Data: TransferData{
Amount: &expectedAmount0,
TokenChain: uint16(vaa.ChainIDEthereum),
TokenAddress: expectedTokenAddress,
RecipientChain: uint16(vaa.ChainIDBSC),
},
Digest: expectedDigest0,
},
},
}
expectedResult1 := TransferDetails{
Key: TransferKey{
EmitterChain: uint16(vaa.ChainIDEthereum),
EmitterAddress: expectedEmitterAddress,
Sequence: 1674484597,
},
Status: TransferStatus{},
}
require.NotNil(t, response.Details[0].Status.Committed)
require.Nil(t, response.Details[0].Status.Pending)
assert.True(t, reflect.DeepEqual(expectedResult0, response.Details[0]))
// Use DeepEqual() because the response contains pointers.
assert.True(t, reflect.DeepEqual(expectedResult0, response.Details[0]))
assert.True(t, reflect.DeepEqual(expectedResult1, response.Details[1]))
}

View File

@ -52,13 +52,15 @@ func (acct *Accountant) handleBatch(ctx context.Context) error {
return fmt.Errorf("failed to read messages from `acct.subChan`: %w", err)
}
msgs = acct.removeCompleted(msgs)
if len(msgs) == 0 {
return nil
}
gs := acct.gst.Get()
if gs == nil {
return fmt.Errorf("failed to get guardian set: %w", err)
return fmt.Errorf("failed to get guardian set")
}
guardianIndex, found := gs.KeyIndex(acct.guardianAddr)
@ -86,7 +88,26 @@ func readFromChannel[T any](ctx context.Context, ch <-chan T, count int) ([]T, e
return out, nil
}
// removeCompleted drops any messages that are no longer in the pending transfer map. This is to handle the case where the contract reports
// that a transfer is committed while it is in the channel. There is no point in submitting the observation once the transfer is committed.
func (acct *Accountant) removeCompleted(msgs []*common.MessagePublication) []*common.MessagePublication {
out := make([]*common.MessagePublication, 0, len(msgs))
for _, msg := range msgs {
if _, exists := acct.pendingTransfers[msg.MessageIDString()]; exists {
out = append(out, msg)
}
}
return out
}
type (
TransferKey struct {
EmitterChain uint16 `json:"emitter_chain"`
EmitterAddress vaa.Address `json:"emitter_address"`
Sequence uint64 `json:"sequence"`
}
SubmitObservationsMsg struct {
Params SubmitObservationsParams `json:"submit_observations"`
}
@ -139,23 +160,17 @@ type (
ObservationResponses []ObservationResponse
ObservationResponse struct {
Key ObservationKey
Key TransferKey
Status ObservationResponseStatus
}
ObservationKey struct {
EmitterChain uint16 `json:"emitter_chain"`
EmitterAddress vaa.Address `json:"emitter_address"`
Sequence uint64 `json:"sequence"`
}
ObservationResponseStatus struct {
Type string `json:"type"`
Data string `json:"data"`
}
)
func (k ObservationKey) String() string {
func (k TransferKey) String() string {
return fmt.Sprintf("%v/%v/%v", k.EmitterChain, hex.EncodeToString(k.EmitterAddress[:]), k.Sequence)
}
@ -173,6 +188,7 @@ func (sb SignatureBytes) MarshalJSON() ([]byte, error) {
// It should be called from a go routine because it can block.
func (acct *Accountant) submitObservationsToContract(msgs []*common.MessagePublication, gsIndex uint32, guardianIndex uint32) {
txResp, err := SubmitObservationsToContract(acct.ctx, acct.logger, acct.gk, gsIndex, guardianIndex, acct.wormchainConn, acct.contract, msgs)
acct.clearSubmitPendingFlags(msgs)
if err != nil {
// This means the whole batch failed. They will all get retried the next audit cycle.
acct.logger.Error("acct: failed to submit any observations in batch", zap.Int("numMsgs", len(msgs)), zap.Error(err))
@ -240,11 +256,11 @@ func (acct *Accountant) handleCommittedTransfer(msgId string) {
defer acct.pendingTransfersLock.Unlock()
pe, exists := acct.pendingTransfers[msgId]
if exists {
acct.logger.Info("acct: transfer has already been committed, publishing it", zap.String("msgId", msgId))
acct.logger.Info("acct: transfer has been committed, publishing it", zap.String("msgId", msgId))
acct.publishTransferAlreadyLocked(pe)
transfersApproved.Inc()
} else {
acct.logger.Debug("acct: transfer has already been committed but it is no longer in our map", zap.String("msgId", msgId))
acct.logger.Debug("acct: transfer has been committed but it is no longer in our map", zap.String("msgId", msgId))
}
}
@ -339,6 +355,7 @@ func SubmitObservationsToContract(
zap.Uint32("gsIndex", gsIndex), zap.Uint32("guardianIndex", guardianIndex),
)
start := time.Now()
txResp, err := wormchainConn.SignAndBroadcastTx(ctx, &subMsg)
if err != nil {
return txResp, fmt.Errorf("failed to send broadcast: %w", err)
@ -364,6 +381,7 @@ func SubmitObservationsToContract(
return txResp, fmt.Errorf("failed to submit observations: %s", txResp.TxResponse.RawLog)
}
logger.Info("acct: done sending broadcast", zap.Int("numObs", len(obs)), zap.Int64("gasUsed", txResp.TxResponse.GasUsed), zap.Stringer("elapsedTime", time.Since(start)))
logger.Debug("acct: in SubmitObservationsToContract, done sending broadcast", zap.String("resp", wormchainConn.BroadcastTxResponseToString(txResp)))
return txResp, nil
}

View File

@ -14,14 +14,14 @@ import (
func TestParseObservationResponseDataKey(t *testing.T) {
dataJson := []byte("{\"emitter_chain\":2,\"emitter_address\":\"0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16\",\"sequence\":1673978163}")
var key ObservationKey
var key TransferKey
err := json.Unmarshal(dataJson, &key)
require.NoError(t, err)
expectedEmitterAddress, err := vaa.StringToAddress("0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16")
require.NoError(t, err)
expectedResult := ObservationKey{
expectedResult := TransferKey{
EmitterChain: uint16(vaa.ChainIDEthereum),
EmitterAddress: expectedEmitterAddress,
Sequence: 1673978163,
@ -40,7 +40,7 @@ func TestParseObservationResponseData(t *testing.T) {
require.NoError(t, err)
expectedResult0 := ObservationResponse{
Key: ObservationKey{
Key: TransferKey{
EmitterChain: uint16(vaa.ChainIDEthereum),
EmitterAddress: expectedEmitterAddress,
Sequence: 1674061268,
@ -51,7 +51,7 @@ func TestParseObservationResponseData(t *testing.T) {
}
expectedResult1 := ObservationResponse{
Key: ObservationKey{
Key: TransferKey{
EmitterChain: uint16(vaa.ChainIDEthereum),
EmitterAddress: expectedEmitterAddress,
Sequence: 1674061267,

View File

@ -117,8 +117,8 @@ type (
// WasmObservationError represents an error event from the smart contract.
WasmObservationError struct {
Key ObservationKey `json:"key"`
Error string `json:"error"`
Key TransferKey `json:"key"`
Error string `json:"error"`
}
)
@ -127,7 +127,7 @@ func parseEvent[T any](logger *zap.Logger, event tmAbci.Event, name string, cont
for _, attr := range event.Attributes {
if string(attr.Key) == "_contract_address" {
if string(attr.Value) != contractAddress {
return nil, fmt.Errorf("wasm-Observation event from unexpected contract: %s", string(attr.Value))
return nil, fmt.Errorf("%s event from unexpected contract: %s", name, string(attr.Value))
}
} else {
logger.Debug("acctwatcher: event attribute", zap.String("event", name), zap.String("key", string(attr.Key)), zap.String("value", string(attr.Value)))
@ -137,12 +137,12 @@ func parseEvent[T any](logger *zap.Logger, event tmAbci.Event, name string, cont
attrBytes, err := json.Marshal(attrs)
if err != nil {
return nil, fmt.Errorf("failed to marshal wasm-Observation event attributes: %w", err)
return nil, fmt.Errorf("failed to marshal %s event attributes: %w", name, err)
}
evt := new(T)
if err := json.Unmarshal(attrBytes, evt); err != nil {
return nil, fmt.Errorf("failed to unmarshal wasm-Observation event: %w", err)
return nil, fmt.Errorf("failed to unmarshal %s event: %w", name, err)
}
return evt, nil

View File

@ -100,7 +100,7 @@ func TestParseWasmObservationError(t *testing.T) {
require.NoError(t, err)
expectedResult := WasmObservationError{
Key: ObservationKey{
Key: TransferKey{
EmitterChain: uint16(vaa.ChainIDEthereum),
EmitterAddress: expectedEmitterAddress,
Sequence: 1674144545,

View File

@ -0,0 +1,19 @@
package wormconn
import (
"context"
"fmt"
wasmdtypes "github.com/CosmWasm/wasmd/x/wasm/types"
)
// SubmitQuery submits a query to a smart contract and returns the result.
func (c *ClientConn) SubmitQuery(ctx context.Context, contractAddress string, query []byte) (*wasmdtypes.QuerySmartContractStateResponse, error) {
req := wasmdtypes.QuerySmartContractStateRequest{Address: contractAddress, QueryData: query}
qc := wasmdtypes.NewQueryClient(c.c)
if qc == nil {
return nil, fmt.Errorf("failed to create query client")
}
return qc.SmartContractState(ctx, &req)
}