diff --git a/node/hack/accountant/send_obs.go b/node/hack/accountant/send_obs.go index f138cd222..89a9a86a8 100644 --- a/node/hack/accountant/send_obs.go +++ b/node/hack/accountant/send_obs.go @@ -1,5 +1,5 @@ -// This tool can be used to confirm that the CoinkGecko price query still works after the token list is updated. -// Usage: go run check_query.go +// This tool can be used to send various observations to the accounting smart contract. +// It is meant for testing purposes only. package main @@ -18,6 +18,8 @@ import ( "github.com/certusone/wormhole/node/pkg/wormconn" "github.com/wormhole-foundation/wormhole/sdk/vaa" + sdktx "github.com/cosmos/cosmos-sdk/types/tx" + ethCrypto "github.com/ethereum/go-ethereum/crypto" "golang.org/x/crypto/openpgp/armor" //nolint @@ -30,23 +32,6 @@ func main() { ctx := context.Background() logger, _ := zap.NewDevelopment() - // data, err := hex.DecodeString("C3AE4256EAA0BA6D01041585F63AE7CAA69D6D33") - // if err != nil { - // logger.Fatal("failed to hex decode string", zap.Error(err)) - // } - - // conv, err := bech32.ConvertBits(data, 8, 5, true) - // if err != nil { - // logger.Fatal("failed to convert bits", zap.Error(err)) - // } - - // encoded, err := bech32.Encode("wormhole", conv) - // if err != nil { - // logger.Fatal("bech32 encode failed", zap.Error(err)) - // } - // logger.Info("encoded", zap.String("str", encoded)) - // return - wormchainURL := string("localhost:9090") wormchainKeyPath := string("./dev.wormchain.key") contract := "wormhole1466nf3zuxpya8q9emxukd7vftaf6h4psr0a07srl5zw74zh84yjq4lyjmh" @@ -77,7 +62,7 @@ func main() { sequence := uint64(time.Now().Unix()) timestamp := time.Now() - if !testSubmit(ctx, logger, gk, wormchainConn, contract, "0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16", timestamp, sequence, false, false, "Submit should succeed") { + if !testSubmit(ctx, logger, gk, wormchainConn, contract, "0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16", timestamp, sequence, true, false, "Submit should succeed") { return } @@ -85,10 +70,27 @@ func main() { return } - sequence += 1 + sequence += 10 if !testSubmit(ctx, logger, gk, wormchainConn, contract, "0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c17", timestamp, sequence, false, true, "Bad emitter address should fail") { return } + + sequence += 10 + if !testBatch(ctx, logger, gk, wormchainConn, contract, timestamp, sequence) { + return + } + + sequence += 10 + if !testBatchWithcommitted(ctx, logger, gk, wormchainConn, contract, timestamp, sequence) { + return + } + + sequence += 10 + if !testBatchWithDigestError(ctx, logger, gk, wormchainConn, contract, timestamp, sequence) { + return + } + + logger.Info("Success! All tests passed!") } func testSubmit( @@ -107,8 +109,6 @@ func testSubmit( EmitterAddress, _ := vaa.StringToAddress(emitterAddressStr) TxHash, _ := vaa.StringToHash("82ea2536c5d1671830cb49120f94479e34b54596a8dd369fbc2666667a765f4b") Payload, _ := hex.DecodeString("010000000000000000000000000000000000000000000000000de0b6b3a76400000000000000000000000000002d8be6bf0baa74e0a907016679cae9190e80dd0a0002000000000000000000000000c10820983f33456ce7beb3a046f5a83fa34f027d0c200000000000000000000000000000000000000000000000000000000000000000") - gsIndex := uint32(0) - guardianIndex := uint32(0) msg := common.MessagePublication{ TxHash: TxHash, @@ -121,36 +121,36 @@ func testSubmit( Payload: Payload, } - txResp, err := accountant.SubmitObservationToContract(ctx, logger, gk, gsIndex, guardianIndex, wormchainConn, contract, &msg) + msgs := []*common.MessagePublication{&msg} + txResp, err := submit(ctx, logger, gk, wormchainConn, contract, msgs) if err != nil { logger.Error("acct: failed to broadcast Observation request", zap.String("test", tag), zap.Error(err)) return false } - // out, err := wormchainConn.BroadcastTxResponseToString(txResp) - // if err != nil { - // logger.Error("acct: failed to parse broadcast response", zap.Error(err)) - // return false - // } - - alreadyCommitted, err := accountant.CheckSubmitObservationResult(txResp) + responses, err := accountant.GetObservationResponses(txResp) if err != nil { - if !errorExpected { - logger.Error("acct: unexpected error", zap.String("test", tag), zap.Error(err)) - return false - } - - logger.Info("test succeeded, expected error returned", zap.String("test", tag), zap.Error(err)) - return true + logger.Error("acct: failed to get responses", zap.Error(err)) + return false } - if alreadyCommitted != expectedResult { - out, err := wormchainConn.BroadcastTxResponseToString(txResp) - if err != nil { - logger.Error("acct: failed to parse broadcast response", zap.String("test", tag), zap.Error(err)) - return false - } - logger.Info("test failed", zap.String("test", tag), zap.Uint64("seqNo", sequence), zap.Bool("alreadyCommitted", alreadyCommitted), zap.String("response", out)) + if len(responses) != len(msgs) { + logger.Error("acct: number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err)) + return false + } + + msgId := msgs[0].MessageIDString() + status, exists := responses[msgId] + if !exists { + logger.Info("test failed: did not receive an observation response for message", zap.String("test", tag), zap.String("msgId", msgId)) + return false + } + + committed := status.Type == "committed" + + if committed != expectedResult { + logger.Info("test failed", zap.String("test", tag), zap.Uint64("seqNo", sequence), zap.Bool("committed", committed), + zap.String("response", wormchainConn.BroadcastTxResponseToString(txResp))) return false } @@ -158,6 +158,298 @@ func testSubmit( return true } +func testBatch( + ctx context.Context, + logger *zap.Logger, + gk *ecdsa.PrivateKey, + wormchainConn *wormconn.ClientConn, + contract string, + timestamp time.Time, + sequence uint64, +) bool { + EmitterAddress, _ := vaa.StringToAddress("0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16") + TxHash, _ := vaa.StringToHash("82ea2536c5d1671830cb49120f94479e34b54596a8dd369fbc2666667a765f4b") + Payload, _ := hex.DecodeString("010000000000000000000000000000000000000000000000000de0b6b3a76400000000000000000000000000002d8be6bf0baa74e0a907016679cae9190e80dd0a0002000000000000000000000000c10820983f33456ce7beb3a046f5a83fa34f027d0c200000000000000000000000000000000000000000000000000000000000000000") + + nonce := uint32(123456) + + msgs := []*common.MessagePublication{} + + msg1 := common.MessagePublication{ + TxHash: TxHash, + Timestamp: timestamp, + Nonce: nonce, + Sequence: sequence, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: EmitterAddress, + ConsistencyLevel: uint8(15), + Payload: Payload, + } + msgs = append(msgs, &msg1) + + nonce = nonce + 1 + sequence = sequence + 1 + msg2 := common.MessagePublication{ + TxHash: TxHash, + Timestamp: time.Now(), + Nonce: nonce, + Sequence: sequence, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: EmitterAddress, + ConsistencyLevel: uint8(15), + Payload: Payload, + } + msgs = append(msgs, &msg2) + + txResp, err := submit(ctx, logger, gk, wormchainConn, contract, msgs) + if err != nil { + logger.Error("acct: failed to broadcast Observation request", zap.Error(err)) + return false + } + + responses, err := accountant.GetObservationResponses(txResp) + if err != nil { + logger.Error("acct: failed to get responses", zap.Error(err)) + return false + } + + if len(responses) != len(msgs) { + logger.Error("acct: number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err)) + return false + } + + for idx, msg := range msgs { + msgId := msg.MessageIDString() + status, exists := responses[msgId] + if !exists { + logger.Error("acct: did not receive an observation response for message", zap.Int("idx", idx), zap.String("msgId", msgId)) + return false + } + + if status.Type != "committed" { + logger.Error("acct: unexpected response on observation", zap.Int("idx", idx), zap.String("msgId", msgId), zap.String("status", status.Type), zap.String("text", status.Data)) + return false + } + } + + logger.Info("test of batch passed.") + return true +} + +func testBatchWithcommitted( + ctx context.Context, + logger *zap.Logger, + gk *ecdsa.PrivateKey, + wormchainConn *wormconn.ClientConn, + contract string, + timestamp time.Time, + sequence uint64, +) bool { + EmitterAddress, _ := vaa.StringToAddress("0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16") + TxHash, _ := vaa.StringToHash("82ea2536c5d1671830cb49120f94479e34b54596a8dd369fbc2666667a765f4b") + Payload, _ := hex.DecodeString("010000000000000000000000000000000000000000000000000de0b6b3a76400000000000000000000000000002d8be6bf0baa74e0a907016679cae9190e80dd0a0002000000000000000000000000c10820983f33456ce7beb3a046f5a83fa34f027d0c200000000000000000000000000000000000000000000000000000000000000000") + + nonce := uint32(123456) + + msgs := []*common.MessagePublication{} + + logger.Info("submitting a single transfer that should work") + msg1 := common.MessagePublication{ + TxHash: TxHash, + Timestamp: timestamp, + Nonce: nonce, + Sequence: sequence, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: EmitterAddress, + ConsistencyLevel: uint8(15), + Payload: Payload, + } + msgs = append(msgs, &msg1) + + _, err := submit(ctx, logger, gk, wormchainConn, contract, msgs) + if err != nil { + logger.Error("acct: failed to submit initial observation that should work", zap.Error(err)) + return false + } + + logger.Info("submitting a second batch where the second one has already been committed") + msgs = msgs[:0] + + nonce = nonce + 1 + sequence = sequence + 1 + msg2 := common.MessagePublication{ + TxHash: TxHash, + Timestamp: time.Now(), + Nonce: nonce, + Sequence: sequence, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: EmitterAddress, + ConsistencyLevel: uint8(15), + Payload: Payload, + } + msgs = append(msgs, &msg2) + + // Same one we just committed. + msg3 := msg1 + msgs = append(msgs, &msg3) + + txResp, err := submit(ctx, logger, gk, wormchainConn, contract, msgs) + if err != nil { + logger.Error("acct: failed to broadcast Observation request", zap.Error(err)) + return false + } + + responses, err := accountant.GetObservationResponses(txResp) + if err != nil { + logger.Error("acct: failed to get responses", zap.Error(err)) + return false + } + + if len(responses) != len(msgs) { + logger.Error("acct: number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err)) + return false + } + + for idx, msg := range msgs { + msgId := msg.MessageIDString() + status, exists := responses[msgId] + if !exists { + logger.Error("acct: did not receive an observation response for message", zap.Int("idx", idx), zap.String("msgId", msgId)) + return false + } + + if status.Type != "committed" { + logger.Error("acct: unexpected response on observation", zap.Int("idx", idx), zap.String("msgId", msgId), zap.String("status", status.Type), zap.String("text", status.Data)) + return false + } + } + + logger.Info("test of batch with already committed passed.") + return true +} + +func testBatchWithDigestError( + ctx context.Context, + logger *zap.Logger, + gk *ecdsa.PrivateKey, + wormchainConn *wormconn.ClientConn, + contract string, + timestamp time.Time, + sequence uint64, +) bool { + EmitterAddress, _ := vaa.StringToAddress("0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16") + TxHash, _ := vaa.StringToHash("82ea2536c5d1671830cb49120f94479e34b54596a8dd369fbc2666667a765f4b") + Payload, _ := hex.DecodeString("010000000000000000000000000000000000000000000000000de0b6b3a76400000000000000000000000000002d8be6bf0baa74e0a907016679cae9190e80dd0a0002000000000000000000000000c10820983f33456ce7beb3a046f5a83fa34f027d0c200000000000000000000000000000000000000000000000000000000000000000") + + nonce := uint32(123456) + + msgs := []*common.MessagePublication{} + + logger.Info("submitting a single transfer that should work") + msg1 := common.MessagePublication{ + TxHash: TxHash, + Timestamp: timestamp, + Nonce: nonce, + Sequence: sequence, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: EmitterAddress, + ConsistencyLevel: uint8(15), + Payload: Payload, + } + msgs = append(msgs, &msg1) + + _, err := submit(ctx, logger, gk, wormchainConn, contract, msgs) + if err != nil { + logger.Error("acct: failed to submit initial observation that should work", zap.Error(err)) + return false + } + + logger.Info("submitting a second batch where the second one has a digest error") + msgs = msgs[:0] + + nonce = nonce + 1 + sequence = sequence + 1 + msg2 := common.MessagePublication{ + TxHash: TxHash, + Timestamp: time.Now(), + Nonce: nonce, + Sequence: sequence, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: EmitterAddress, + ConsistencyLevel: uint8(15), + Payload: Payload, + } + msgs = append(msgs, &msg2) + + // Same key as the message we committed but change the digest. + msg3 := msg1 + msg3.Nonce = msg3.Nonce + 1 + msgs = append(msgs, &msg3) + + txResp, err := submit(ctx, logger, gk, wormchainConn, contract, msgs) + if err != nil { + logger.Error("acct: failed to submit second observation that should work", zap.Error(err)) + return false + } + + responses, err := accountant.GetObservationResponses(txResp) + if err != nil { + logger.Error("acct: failed to get responses", zap.Error(err)) + return false + } + + if len(responses) != len(msgs) { + logger.Error("acct: number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err)) + return false + } + + msgId := msgs[0].MessageIDString() + status, exists := responses[msgId] + if !exists { + logger.Error("acct: did not receive an observation response for message 0", zap.String("msgId", msgId)) + return false + } + + if status.Type != "committed" { + logger.Error("acct: unexpected response on observation for message 0", zap.String("msgId", msgId), zap.String("status", status.Type), zap.String("text", status.Data)) + return false + } + + msgId = msgs[1].MessageIDString() + status, exists = responses[msgId] + if !exists { + logger.Error("acct: did not receive an observation response for message 1", zap.String("msgId", msgId)) + return false + } + + if status.Type != "error" { + logger.Error("acct: unexpected response on observation for message 1", zap.String("status", status.Type), zap.String("text", status.Data)) + return false + } + + if status.Data != "digest mismatch for processed message" { + logger.Error("acct: unexpected error text on observation for message 1", zap.String("text", status.Data)) + return false + } + + logger.Info("test of batch with digest error passed.") + return true +} + +func submit( + ctx context.Context, + logger *zap.Logger, + gk *ecdsa.PrivateKey, + wormchainConn *wormconn.ClientConn, + contract string, + msgs []*common.MessagePublication, +) (*sdktx.BroadcastTxResponse, error) { + gsIndex := uint32(0) + guardianIndex := uint32(0) + + return accountant.SubmitObservationsToContract(ctx, logger, gk, gsIndex, guardianIndex, wormchainConn, contract, msgs) +} + const ( GuardianKeyArmoredBlock = "WORMHOLE GUARDIAN PRIVATE KEY" ) @@ -196,16 +488,3 @@ func loadGuardianKey(filename string) (*ecdsa.PrivateKey, error) { return gk, nil } - -/* -DEBUG: obs: { - key: { - emitter_chain: 2, - emitter_address: 'AAAAAAAAAAAAAAAAApD7FnIIr0VbsTd4AWO3t6mhDBY=', - sequence: 0 - }, - nonce: 0, - payload: 'AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA3gtrOnZAAAAAAAAAAAAAAAAAAALYvmvwuqdOCpBwFmecrpGQ6A3QoAAgAAAAAAAAAAAAAAAMEIIJg/M0Vs576zoEb1qD+jTwJ9DCAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==', - tx_hash: '82ea2536c5d1671830cb49120f94479e34b54596a8dd369fbc2666667a765f4b' -} -*/ diff --git a/node/pkg/accountant/accountant.go b/node/pkg/accountant/accountant.go index 9a3ecf429..3a544edb9 100644 --- a/node/pkg/accountant/accountant.go +++ b/node/pkg/accountant/accountant.go @@ -220,7 +220,7 @@ func (acct *Accountant) SubmitObservation(msg *common.MessagePublication) (bool, } // Add it to the pending map and the database. - if err := acct.addPendingTransfer(msgId, msg, digest); err != nil { + if err := acct.addPendingTransferAlreadyLocked(msgId, msg, digest); err != nil { acct.logger.Error("acct: failed to persist pending transfer, blocking publishing", zap.String("msgID", msgId), zap.Error(err)) return false, err } @@ -254,7 +254,7 @@ func (acct *Accountant) AuditPendingTransfers() { pe.retryCount += 1 if pe.retryCount > maxRetries { acct.logger.Error("acct: stuck pending transfer has reached the retry limit, dropping it", zap.String("msgId", msgId)) - acct.deletePendingTransfer(msgId) + acct.deletePendingTransferAlreadyLocked(msgId) continue } @@ -272,19 +272,19 @@ func (acct *Accountant) AuditPendingTransfers() { acct.logger.Debug("acct: leaving AuditPendingTransfers") } -// publishTransfer publishes a pending transfer to the accountant channel and updates the timestamp. It assumes the caller holds the lock. -func (acct *Accountant) publishTransfer(pe *pendingEntry) { +// publishTransferAlreadyLocked publishes a pending transfer to the accountant channel and updates the timestamp. It assumes the caller holds the lock. +func (acct *Accountant) publishTransferAlreadyLocked(pe *pendingEntry) { if acct.enforceFlag { - acct.logger.Debug("acct: publishTransfer: notifying the processor", zap.String("msgId", pe.msgId)) + acct.logger.Debug("acct: publishTransferAlreadyLocked: notifying the processor", zap.String("msgId", pe.msgId)) acct.msgChan <- pe.msg } - acct.deletePendingTransfer(pe.msgId) + acct.deletePendingTransferAlreadyLocked(pe.msgId) } -// addPendingTransfer adds a pending transfer to both the map and the database. It assumes the caller holds the lock. -func (acct *Accountant) addPendingTransfer(msgId string, msg *common.MessagePublication, digest string) error { - acct.logger.Debug("acct: addPendingTransfer", zap.String("msgId", msgId)) +// addPendingTransferAlreadyLocked adds a pending transfer to both the map and the database. It assumes the caller holds the lock. +func (acct *Accountant) addPendingTransferAlreadyLocked(msgId string, msg *common.MessagePublication, digest string) error { + acct.logger.Debug("acct: addPendingTransferAlreadyLocked", zap.String("msgId", msgId)) if err := acct.db.AcctStorePendingTransfer(msg); err != nil { return err } @@ -295,8 +295,15 @@ func (acct *Accountant) addPendingTransfer(msgId string, msg *common.MessagePubl return nil } -// deletePendingTransfer deletes the transfer from both the map and the database. It assumes the caller holds the lock. +// deletePendingTransfer deletes the transfer from both the map and the database. It accquires the lock. func (acct *Accountant) deletePendingTransfer(msgId string) { + acct.pendingTransfersLock.Lock() + defer acct.pendingTransfersLock.Unlock() + acct.deletePendingTransferAlreadyLocked(msgId) +} + +// deletePendingTransferAlreadyLocked deletes the transfer from both the map and the database. It assumes the caller holds the lock. +func (acct *Accountant) deletePendingTransferAlreadyLocked(msgId string) { acct.logger.Debug("acct: deletePendingTransfer", zap.String("msgId", msgId)) if _, exists := acct.pendingTransfers[msgId]; exists { transfersOutstanding.Dec() diff --git a/node/pkg/accountant/accountant_test.go b/node/pkg/accountant/accountant_test.go index 01efa01d2..136b46e4b 100644 --- a/node/pkg/accountant/accountant_test.go +++ b/node/pkg/accountant/accountant_test.go @@ -187,7 +187,7 @@ func TestInterestingTransferShouldNotBeBlockedWhenNotEnforcingAccountant(t *test require.NotNil(t, pe) // PublishTransfer should not publish to the channel but it should remove it from the map. - acct.publishTransfer(pe) + acct.publishTransferAlreadyLocked(pe) assert.Equal(t, 0, len(acct.msgChan)) assert.Equal(t, 0, len(acct.pendingTransfers)) } @@ -236,7 +236,7 @@ func TestInterestingTransferShouldBeBlockedWhenEnforcingAccountant(t *testing.T) require.NotNil(t, pe) // PublishTransfer should publish to the channel and remove it from the map. - acct.publishTransfer(pe) + acct.publishTransferAlreadyLocked(pe) assert.Equal(t, 1, len(acct.msgChan)) assert.Equal(t, 0, len(acct.pendingTransfers)) } diff --git a/node/pkg/accountant/metrics.go b/node/pkg/accountant/metrics.go index 484d546be..dfd64ea1f 100644 --- a/node/pkg/accountant/metrics.go +++ b/node/pkg/accountant/metrics.go @@ -27,6 +27,11 @@ var ( Name: "global_accountant_events_received", Help: "Total number of accountant events received from the smart contract", }) + errorEventsReceived = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "global_accountant_error_events_received", + Help: "Total number of accountant error events received from the smart contract", + }) submitFailures = promauto.NewCounter( prometheus.CounterOpts{ Name: "global_accountant_submit_failures", diff --git a/node/pkg/accountant/submit_obs.go b/node/pkg/accountant/submit_obs.go index b82e9eb98..624430724 100644 --- a/node/pkg/accountant/submit_obs.go +++ b/node/pkg/accountant/submit_obs.go @@ -5,8 +5,10 @@ import ( "crypto/ecdsa" "encoding/hex" "encoding/json" + "errors" "fmt" "strings" + "time" "github.com/certusone/wormhole/node/pkg/common" "github.com/certusone/wormhole/node/pkg/wormconn" @@ -21,31 +23,69 @@ import ( "go.uber.org/zap" ) +// TODO: Arbitrary values. What makes sense? +const batchSize = 10 +const delayInMS = 100 * time.Millisecond + +// worker listens for observation requests from the accountant and submits them to the smart contract. func (acct *Accountant) worker(ctx context.Context) error { for { select { case <-ctx.Done(): return nil - case msg := <-acct.subChan: - gs := acct.gst.Get() - if gs == nil { - acct.logger.Error("acct: unable to send observation request: failed to look up guardian set", zap.String("msgID", msg.MessageIDString())) - continue + default: + if err := acct.handleBatch(ctx); err != nil { + return err } - - guardianIndex, found := gs.KeyIndex(acct.guardianAddr) - if !found { - acct.logger.Error("acct: unable to send observation request: failed to look up guardian index", - zap.String("msgID", msg.MessageIDString()), zap.Stringer("guardianAddr", acct.guardianAddr)) - continue - } - - acct.submitObservationToContract(msg, gs.Index, uint32(guardianIndex)) - transfersSubmitted.Inc() } } } +// handleBatch reads a batch of events from the channel, either until a timeout occurs or the batch is full, +// and submits them to the smart contract. +func (acct *Accountant) handleBatch(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, delayInMS) + defer cancel() + + msgs, err := readFromChannel[*common.MessagePublication](ctx, acct.subChan, batchSize) + if err != nil && !errors.Is(err, context.DeadlineExceeded) { + return fmt.Errorf("failed to read messages from `acct.subChan`: %w", err) + } + + if len(msgs) == 0 { + return nil + } + + gs := acct.gst.Get() + if gs == nil { + return fmt.Errorf("failed to get guardian set: %w", err) + } + + guardianIndex, found := gs.KeyIndex(acct.guardianAddr) + if !found { + return fmt.Errorf("failed to get guardian index") + } + + acct.submitObservationsToContract(msgs, gs.Index, uint32(guardianIndex)) + transfersSubmitted.Add(float64(len(msgs))) + return nil +} + +// readFromChannel reads events from the channel until a timeout occurs or the batch is full, and returns them. +func readFromChannel[T any](ctx context.Context, ch <-chan T, count int) ([]T, error) { + out := make([]T, 0, count) + for len(out) < count { + select { + case <-ctx.Done(): + return out, ctx.Err() + case msg := <-ch: + out = append(out, msg) + } + } + + return out, nil +} + type ( SubmitObservationsMsg struct { Params SubmitObservationsParams `json:"submit_observations"` @@ -83,7 +123,7 @@ type ( EmitterChain uint16 `json:"emitter_chain"` // The address on the source chain that emitted this message. - EmitterAddress [32]byte `json:"emitter_address"` + EmitterAddress vaa.Address `json:"emitter_address"` // The sequence number of this observation. Sequence uint64 `json:"sequence"` @@ -94,8 +134,31 @@ type ( // The serialized tokenbridge payload. Payload []byte `json:"payload"` } + + // These are used to parse the response data + ObservationResponses []ObservationResponse + + ObservationResponse struct { + Key ObservationKey + Status ObservationResponseStatus + } + + ObservationKey struct { + EmitterChain uint16 `json:"emitter_chain"` + EmitterAddress vaa.Address `json:"emitter_address"` + Sequence uint64 `json:"sequence"` + } + + ObservationResponseStatus struct { + Type string `json:"type"` + Data string `json:"data"` + } ) +func (k ObservationKey) String() string { + return fmt.Sprintf("%v/%v/%v", k.EmitterChain, hex.EncodeToString(k.EmitterAddress[:]), k.Sequence) +} + func (sb SignatureBytes) MarshalJSON() ([]byte, error) { var result string if sb == nil { @@ -106,49 +169,101 @@ func (sb SignatureBytes) MarshalJSON() ([]byte, error) { return []byte(result), nil } -// submitObservationToContract makes a call to the smart contract to submit an observation request. +// submitObservationsToContract makes a call to the smart contract to submit a batch of observation requests. // It should be called from a go routine because it can block. -func (acct *Accountant) submitObservationToContract(msg *common.MessagePublication, gsIndex uint32, guardianIndex uint32) { - msgId := msg.MessageIDString() - acct.logger.Debug("acct: in submitObservationToContract", zap.String("msgID", msgId)) - txResp, err := SubmitObservationToContract(acct.ctx, acct.logger, acct.gk, gsIndex, guardianIndex, acct.wormchainConn, acct.contract, msg) +func (acct *Accountant) submitObservationsToContract(msgs []*common.MessagePublication, gsIndex uint32, guardianIndex uint32) { + txResp, err := SubmitObservationsToContract(acct.ctx, acct.logger, acct.gk, gsIndex, guardianIndex, acct.wormchainConn, acct.contract, msgs) if err != nil { - acct.logger.Error("acct: failed to submit observation request", zap.String("msgId", msgId), zap.Error(err)) - submitFailures.Inc() - return - } - - alreadyCommitted, err := CheckSubmitObservationResult(txResp) - if err != nil { - submitFailures.Inc() - if strings.Contains(err.Error(), "insufficient balance") { - balanceErrors.Inc() - acct.logger.Error("acct: insufficient balance error detected, dropping transfer", zap.String("msgId", msgId), zap.Error(err)) - acct.pendingTransfersLock.Lock() - defer acct.pendingTransfersLock.Unlock() - acct.deletePendingTransfer(msgId) - } else { - acct.logger.Error("acct: failed to submit observation request", zap.String("msgId", msgId), zap.Error(err)) + // This means the whole batch failed. They will all get retried the next audit cycle. + acct.logger.Error("acct: failed to submit any observations in batch", zap.Int("numMsgs", len(msgs)), zap.Error(err)) + for idx, msg := range msgs { + acct.logger.Error("acct: failed to submit observation", zap.Int("idx", idx), zap.String("msgId", msg.MessageIDString())) } + + submitFailures.Add(float64(len(msgs))) return } - if alreadyCommitted { - acct.pendingTransfersLock.Lock() - defer acct.pendingTransfersLock.Unlock() - pe, exists := acct.pendingTransfers[msgId] - if exists { - acct.logger.Info("acct: transfer has already been committed, publishing it", zap.String("msgId", msgId)) - acct.publishTransfer(pe) - transfersApproved.Inc() - } else { - acct.logger.Debug("acct: transfer has already been committed, and it is no longer in our map", zap.String("msgId", msgId)) + responses, err := GetObservationResponses(txResp) + if err != nil { + // This means the whole batch failed. They will all get retried the next audit cycle. + acct.logger.Error("acct: failed to get responses from batch", zap.Error(err)) + for idx, msg := range msgs { + acct.logger.Error("acct: need to retry observation", zap.Int("idx", idx), zap.String("msgId", msg.MessageIDString())) + } + + submitFailures.Add(float64(len(msgs))) + return + } + + if len(responses) != len(msgs) { + // This means the whole batch failed. They will all get retried the next audit cycle. + acct.logger.Error("acct: number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err)) + for idx, msg := range msgs { + acct.logger.Error("acct: need to retry observation", zap.Int("idx", idx), zap.String("msgId", msg.MessageIDString())) + } + + submitFailures.Add(float64(len(msgs))) + return + } + + for _, msg := range msgs { + msgId := msg.MessageIDString() + + status, exists := responses[msgId] + if !exists { + // This will get retried next audit interval. + acct.logger.Error("acct: did not receive an observation response for message", zap.String("msgId", msgId)) + submitFailures.Inc() + continue + } + + switch status.Type { + case "pending": + acct.logger.Info("acct: transfer is pending", zap.String("msgId", msgId)) + case "committed": + acct.handleCommittedTransfer(msgId) + case "error": + submitFailures.Inc() + acct.handleTransferError(msgId, status.Data, "acct: transfer failed") + default: + // This will get retried next audit interval. + acct.logger.Error("acct: unexpected status response on observation", zap.String("msgId", msgId), zap.String("status", status.Type), zap.String("text", status.Data)) + submitFailures.Inc() } } } -// SubmitObservationToContract is a free function to make a call to the smart contract to submit an observation request. -func SubmitObservationToContract( +// handleCommittedTransfer updates the pending map and publishes a committed transfer. It grabs the lock. +func (acct *Accountant) handleCommittedTransfer(msgId string) { + acct.pendingTransfersLock.Lock() + defer acct.pendingTransfersLock.Unlock() + pe, exists := acct.pendingTransfers[msgId] + if exists { + acct.logger.Info("acct: transfer has already been committed, publishing it", zap.String("msgId", msgId)) + acct.publishTransferAlreadyLocked(pe) + transfersApproved.Inc() + } else { + acct.logger.Debug("acct: transfer has already been committed but it is no longer in our map", zap.String("msgId", msgId)) + } +} + +// handleTransferError is called when a transfer fails, either from a submit or an event notification. It handles insufficient balance error. It grabs the lock. +func (acct *Accountant) handleTransferError(msgId string, errText string, logText string) { + if strings.Contains(errText, "insufficient balance") { + balanceErrors.Inc() + acct.logger.Error("acct: insufficient balance error detected, dropping transfer", zap.String("msgId", msgId), zap.String("text", errText)) + acct.deletePendingTransfer(msgId) + } else { + // This will get retried next audit interval. + acct.logger.Error(logText, zap.String("msgId", msgId), zap.String("text", errText)) + } +} + +// SubmitObservationsToContract is a free function to make a call to the smart contract to submit an observation request. +// If the submit fails or the result contains an error, it will return the error. If an error is returned, the caller is +// expected to use GetFailedIndexInBatch() to see which observation in the batch failed. +func SubmitObservationsToContract( ctx context.Context, logger *zap.Logger, gk *ecdsa.PrivateKey, @@ -156,10 +271,11 @@ func SubmitObservationToContract( guardianIndex uint32, wormchainConn *wormconn.ClientConn, contract string, - msg *common.MessagePublication, + msgs []*common.MessagePublication, ) (*sdktx.BroadcastTxResponse, error) { - obs := []Observation{ - Observation{ + obs := make([]Observation, len(msgs)) + for idx, msg := range msgs { + obs[idx] = Observation{ TxHash: msg.TxHash.Bytes(), Timestamp: uint32(msg.Timestamp.Unix()), Nonce: msg.Nonce, @@ -168,7 +284,19 @@ func SubmitObservationToContract( Sequence: msg.Sequence, ConsistencyLevel: msg.ConsistencyLevel, Payload: msg.Payload, - }, + } + + logger.Debug("acct: in SubmitObservationsToContract, encoding observation", + zap.Int("idx", idx), + zap.String("txHash", msg.TxHash.String()), zap.String("encTxHash", hex.EncodeToString(obs[idx].TxHash[:])), + zap.Stringer("timeStamp", msg.Timestamp), zap.Uint32("encTimestamp", obs[idx].Timestamp), + zap.Uint32("nonce", msg.Nonce), zap.Uint32("encNonce", obs[idx].Nonce), + zap.Stringer("emitterChain", msg.EmitterChain), zap.Uint16("encEmitterChain", obs[idx].EmitterChain), + zap.Stringer("emitterAddress", msg.EmitterAddress), zap.String("encEmitterAddress", hex.EncodeToString(obs[idx].EmitterAddress[:])), + zap.Uint64("squence", msg.Sequence), zap.Uint64("encSequence", obs[idx].Sequence), + zap.Uint8("consistencyLevel", msg.ConsistencyLevel), zap.Uint8("encConsistencyLevel", obs[idx].ConsistencyLevel), + zap.String("payload", hex.EncodeToString(msg.Payload)), zap.String("encPayload", hex.EncodeToString(obs[idx].Payload)), + ) } bytes, err := json.Marshal(obs) @@ -205,69 +333,69 @@ func SubmitObservationToContract( Funds: sdktypes.Coins{}, } - logger.Debug("acct: in SubmitObservationToContract, sending broadcast", - zap.String("txHash", msg.TxHash.String()), zap.String("encTxHash", hex.EncodeToString(obs[0].TxHash[:])), - zap.Stringer("timeStamp", msg.Timestamp), zap.Uint32("encTimestamp", obs[0].Timestamp), - zap.Uint32("nonce", msg.Nonce), zap.Uint32("encNonce", obs[0].Nonce), - zap.Stringer("emitterChain", msg.EmitterChain), zap.Uint16("encEmitterChain", obs[0].EmitterChain), - zap.Stringer("emitterAddress", msg.EmitterAddress), zap.String("encEmitterAddress", hex.EncodeToString(obs[0].EmitterAddress[:])), - zap.Uint64("squence", msg.Sequence), zap.Uint64("encSequence", obs[0].Sequence), - zap.Uint8("consistencyLevel", msg.ConsistencyLevel), zap.Uint8("encConsistencyLevel", obs[0].ConsistencyLevel), - zap.String("payload", hex.EncodeToString(msg.Payload)), zap.String("encPayload", hex.EncodeToString(obs[0].Payload)), + logger.Debug("acct: in SubmitObservationsToContract, sending broadcast", + zap.Int("numObs", len(obs)), zap.String("observations", string(bytes)), zap.Uint32("gsIndex", gsIndex), zap.Uint32("guardianIndex", guardianIndex), ) txResp, err := wormchainConn.SignAndBroadcastTx(ctx, &subMsg) if err != nil { - logger.Error("acct: SubmitObservationToContract failed to send broadcast", zap.Error(err)) - } else { - if txResp.TxResponse == nil { - return txResp, fmt.Errorf("txResp.TxResponse is nil") - } - if strings.Contains(txResp.TxResponse.RawLog, "out of gas") { - return txResp, fmt.Errorf("out of gas: %s", txResp.TxResponse.RawLog) - } - - out, err := wormchainConn.BroadcastTxResponseToString(txResp) - if err != nil { - logger.Error("acct: SubmitObservationToContract failed to parse broadcast response", zap.Error(err)) - } else { - logger.Debug("acct: in SubmitObservationToContract, done sending broadcast", zap.String("resp", out)) - } + return txResp, fmt.Errorf("failed to send broadcast: %w", err) } - return txResp, err -} -// CheckSubmitObservationResult() is a free function that returns true if the observation has already been committed -// or false if we need to wait for the commit. An error is returned when appropriate. -func CheckSubmitObservationResult(txResp *sdktx.BroadcastTxResponse) (bool, error) { if txResp == nil { - return false, fmt.Errorf("txResp is nil") + return txResp, fmt.Errorf("sent broadcast but returned txResp is nil") } + if txResp.TxResponse == nil { - return false, fmt.Errorf("txResp does not contain a TxResponse") + return txResp, fmt.Errorf("sent broadcast but returned txResp.TxResponse is nil") } + if txResp.TxResponse.RawLog == "" { - return false, fmt.Errorf("RawLog is not set") + return txResp, fmt.Errorf("sent broadcast but raw_log is not set, unable to analyze the result") } - if strings.Contains(txResp.TxResponse.RawLog, "execute wasm contract failed") { - if strings.Contains(txResp.TxResponse.RawLog, "already committed") { - return true, nil - } - - // TODO Need to test this, requires multiple guardians. - if strings.Contains(txResp.TxResponse.RawLog, "duplicate signature") { - return false, nil - } - - return false, fmt.Errorf(txResp.TxResponse.RawLog) + if strings.Contains(txResp.TxResponse.RawLog, "out of gas") { + return txResp, fmt.Errorf("out of gas: %s", txResp.TxResponse.RawLog) } if strings.Contains(txResp.TxResponse.RawLog, "failed to execute message") { - return false, fmt.Errorf(txResp.TxResponse.RawLog) + return txResp, fmt.Errorf("failed to submit observations: %s", txResp.TxResponse.RawLog) } - return false, nil + logger.Debug("acct: in SubmitObservationsToContract, done sending broadcast", zap.String("resp", wormchainConn.BroadcastTxResponseToString(txResp))) + return txResp, nil +} + +// GetObservationResponses is a free function that extracts the observation responses from a transaction response. +// It assumes the transaction response is valid (SubmitObservationsToContract() did not return an error). +func GetObservationResponses(txResp *sdktx.BroadcastTxResponse) (map[string]ObservationResponseStatus, error) { + data, err := hex.DecodeString(txResp.TxResponse.Data) + if err != nil { + return nil, fmt.Errorf("failed to decode data: %w", err) + } + + var msg sdktypes.TxMsgData + if err := msg.Unmarshal([]byte(data)); err != nil { + return nil, fmt.Errorf("failed to unmarshal data: %w", err) + } + + var execContractResp wasmdtypes.MsgExecuteContractResponse + if err := execContractResp.Unmarshal(msg.Data[0].Data); err != nil { + return nil, fmt.Errorf("failed to unmarshal ExecuteContractResponse: %w", err) + } + + var responses ObservationResponses + err = json.Unmarshal(execContractResp.Data, &responses) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal responses: %w", err) + } + + out := make(map[string]ObservationResponseStatus) + for _, resp := range responses { + out[resp.Key.String()] = resp.Status + } + + return out, nil } diff --git a/node/pkg/accountant/submit_obs_test.go b/node/pkg/accountant/submit_obs_test.go new file mode 100644 index 000000000..9a47c8cc0 --- /dev/null +++ b/node/pkg/accountant/submit_obs_test.go @@ -0,0 +1,67 @@ +package accountant + +import ( + // "encoding/hex" + "encoding/json" + "testing" + + "github.com/wormhole-foundation/wormhole/sdk/vaa" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseObservationResponseDataKey(t *testing.T) { + dataJson := []byte("{\"emitter_chain\":2,\"emitter_address\":\"0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16\",\"sequence\":1673978163}") + + var key ObservationKey + err := json.Unmarshal(dataJson, &key) + require.NoError(t, err) + + expectedEmitterAddress, err := vaa.StringToAddress("0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16") + require.NoError(t, err) + + expectedResult := ObservationKey{ + EmitterChain: uint16(vaa.ChainIDEthereum), + EmitterAddress: expectedEmitterAddress, + Sequence: 1673978163, + } + assert.Equal(t, expectedResult, key) +} + +func TestParseObservationResponseData(t *testing.T) { + responsesJson := []byte("[{\"key\":{\"emitter_chain\":2,\"emitter_address\":\"0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16\",\"sequence\":1674061268},\"status\":{\"type\":\"committed\"}},{\"key\":{\"emitter_chain\":2,\"emitter_address\":\"0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16\",\"sequence\":1674061267},\"status\":{\"type\":\"error\",\"data\":\"digest mismatch for processed message\"}}]") + var responses ObservationResponses + err := json.Unmarshal(responsesJson, &responses) + require.NoError(t, err) + require.Equal(t, 2, len(responses)) + + expectedEmitterAddress, err := vaa.StringToAddress("0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16") + require.NoError(t, err) + + expectedResult0 := ObservationResponse{ + Key: ObservationKey{ + EmitterChain: uint16(vaa.ChainIDEthereum), + EmitterAddress: expectedEmitterAddress, + Sequence: 1674061268, + }, + Status: ObservationResponseStatus{ + Type: "committed", + }, + } + + expectedResult1 := ObservationResponse{ + Key: ObservationKey{ + EmitterChain: uint16(vaa.ChainIDEthereum), + EmitterAddress: expectedEmitterAddress, + Sequence: 1674061267, + }, + Status: ObservationResponseStatus{ + Type: "error", + Data: "digest mismatch for processed message", + }, + } + + assert.Equal(t, expectedResult0, responses[0]) + assert.Equal(t, expectedResult1, responses[1]) +} diff --git a/node/pkg/accountant/watcher.go b/node/pkg/accountant/watcher.go index 117aa1e53..8c34a3884 100644 --- a/node/pkg/accountant/watcher.go +++ b/node/pkg/accountant/watcher.go @@ -85,70 +85,76 @@ func (acct *Accountant) handleEvents(ctx context.Context, evts <-chan tmCoreType } for _, event := range tx.Result.Events { - if event.Type == "wasm-Transfer" { - xfer, err := parseWasmTransfer(acct.logger, event, acct.contract) + if event.Type == "wasm-Observation" { + evt, err := parseEvent[WasmObservation](acct.logger, event, "wasm-Observation", acct.contract) if err != nil { - acct.logger.Error("acctwatcher: failed to parse wasm event", zap.Error(err), zap.Stringer("e.Data", reflect.TypeOf(e.Data)), zap.Any("event", event)) + acct.logger.Error("acctwatcher: failed to parse wasm transfer event", zap.Error(err), zap.Stringer("e.Data", reflect.TypeOf(e.Data)), zap.Any("event", event)) continue } eventsReceived.Inc() - acct.processPendingTransfer(xfer) + acct.processPendingTransfer(evt) + } else if event.Type == "wasm-ObservationError" { + evt, err := parseEvent[WasmObservationError](acct.logger, event, "wasm-ObservationError", acct.contract) + if err != nil { + acct.logger.Error("acctwatcher: failed to parse wasm observation error event", zap.Error(err), zap.Stringer("e.Data", reflect.TypeOf(e.Data)), zap.Any("event", event)) + continue + } + + errorEventsReceived.Inc() + acct.handleTransferError(evt.Key.String(), evt.Error, "acct: transfer error event received") } else { - acct.logger.Debug("acctwatcher: ignoring non-transfer event", zap.String("eventType", event.Type)) + acct.logger.Debug("acctwatcher: ignoring uninteresting event", zap.String("eventType", event.Type)) } } } } } -// WasmTransfer represents a transfer event from the smart contract. -type WasmTransfer struct { - TxHashBytes []byte `json:"tx_hash"` - Timestamp uint32 `json:"timestamp"` - Nonce uint32 `json:"nonce"` - EmitterChain uint16 `json:"emitter_chain"` - EmitterAddress vaa.Address `json:"emitter_address"` - Sequence uint64 `json:"sequence"` - ConsistencyLevel uint8 `json:"consistency_level"` - Payload []byte `json:"payload"` -} +type ( + // WasmObservation represents a transfer event from the smart contract. + WasmObservation Observation -// parseWasmTransfer parses transfer events from the smart contract. All other event types are ignored. -func parseWasmTransfer(logger *zap.Logger, event tmAbci.Event, contractAddress string) (*WasmTransfer, error) { - if event.Type != "wasm-Transfer" { - return nil, fmt.Errorf("not a WasmTransfer event: %s", event.Type) + // WasmObservationError represents an error event from the smart contract. + WasmObservationError struct { + Key ObservationKey `json:"key"` + Error string `json:"error"` } +) +func parseEvent[T any](logger *zap.Logger, event tmAbci.Event, name string, contractAddress string) (*T, error) { attrs := make(map[string]json.RawMessage) for _, attr := range event.Attributes { if string(attr.Key) == "_contract_address" { if string(attr.Value) != contractAddress { - return nil, fmt.Errorf("WasmTransfer event from unexpected contract: %s", string(attr.Value)) + return nil, fmt.Errorf("wasm-Observation event from unexpected contract: %s", string(attr.Value)) } } else { - logger.Debug("acctwatcher: attribute", zap.String("key", string(attr.Key)), zap.String("value", string(attr.Value))) + logger.Debug("acctwatcher: event attribute", zap.String("event", name), zap.String("key", string(attr.Key)), zap.String("value", string(attr.Value))) attrs[string(attr.Key)] = attr.Value } } attrBytes, err := json.Marshal(attrs) if err != nil { - return nil, fmt.Errorf("failed to marshal event attributes: %w", err) + return nil, fmt.Errorf("failed to marshal wasm-Observation event attributes: %w", err) } - evt := new(WasmTransfer) + evt := new(T) if err := json.Unmarshal(attrBytes, evt); err != nil { - return nil, fmt.Errorf("failed to unmarshal WasmTransfer event: %w", err) + return nil, fmt.Errorf("failed to unmarshal wasm-Observation event: %w", err) } return evt, nil } -// processPendingTransfer takes a WasmTransfer event, determines if we are expecting it, and if so, publishes it. -func (acct *Accountant) processPendingTransfer(xfer *WasmTransfer) { +// processPendingTransfer takes a WasmObservation event, determines if we are expecting it, and if so, publishes it. +func (acct *Accountant) processPendingTransfer(xfer *WasmObservation) { + acct.pendingTransfersLock.Lock() + defer acct.pendingTransfersLock.Unlock() + acct.logger.Info("acctwatch: transfer event detected", - zap.String("tx_hash", hex.EncodeToString(xfer.TxHashBytes)), + zap.String("tx_hash", hex.EncodeToString(xfer.TxHash)), zap.Uint32("timestamp", xfer.Timestamp), zap.Uint32("nonce", xfer.Nonce), zap.Stringer("emitter_chain", vaa.ChainID(xfer.EmitterChain)), @@ -159,7 +165,7 @@ func (acct *Accountant) processPendingTransfer(xfer *WasmTransfer) { ) msg := &common.MessagePublication{ - TxHash: ethCommon.BytesToHash(xfer.TxHashBytes), + TxHash: ethCommon.BytesToHash(xfer.TxHash), Timestamp: time.Unix(int64(xfer.Timestamp), 0), Nonce: xfer.Nonce, Sequence: xfer.Sequence, @@ -171,9 +177,6 @@ func (acct *Accountant) processPendingTransfer(xfer *WasmTransfer) { msgId := msg.MessageIDString() - acct.pendingTransfersLock.Lock() - defer acct.pendingTransfersLock.Unlock() - pe, exists := acct.pendingTransfers[msgId] if exists { digest := msg.CreateDigest() @@ -185,11 +188,11 @@ func (acct *Accountant) processPendingTransfer(xfer *WasmTransfer) { zap.String("newDigest", digest), ) - acct.deletePendingTransfer(msgId) + acct.deletePendingTransferAlreadyLocked(msgId) return } acct.logger.Info("acctwatch: pending transfer has been approved", zap.String("msgId", msgId)) - acct.publishTransfer(pe) + acct.publishTransferAlreadyLocked(pe) transfersApproved.Inc() } else { // TODO: We could issue a reobservation request here since it looks like other guardians have seen this transfer but we haven't. diff --git a/node/pkg/accountant/watcher_test.go b/node/pkg/accountant/watcher_test.go index 722514e1b..e1091a646 100644 --- a/node/pkg/accountant/watcher_test.go +++ b/node/pkg/accountant/watcher_test.go @@ -15,15 +15,15 @@ import ( "go.uber.org/zap" ) -func TestParseWasmTransferFromTestTool(t *testing.T) { +func TestParseWasmObservationFromTestTool(t *testing.T) { logger := zap.NewNop() - eventJson := []byte("{\"type\":\"wasm-Transfer\",\"attributes\":[{\"key\":\"X2NvbnRyYWN0X2FkZHJlc3M=\",\"value\":\"d29ybWhvbGUxNDY2bmYzenV4cHlhOHE5ZW14dWtkN3ZmdGFmNmg0cHNyMGEwN3NybDV6dzc0emg4NHlqcTRseWptaA==\",\"index\":true},{\"key\":\"dHhfaGFzaA==\",\"value\":\"Imd1b2xOc1hSWnhnd3kwa1NENVJIbmpTMVJaYW8zVGFmdkNabVpucDJYMHM9Ig==\",\"index\":true},{\"key\":\"dGltZXN0YW1w\",\"value\":\"MTY3MjkzMjk5OA==\",\"index\":true},{\"key\":\"bm9uY2U=\",\"value\":\"MA==\",\"index\":true},{\"key\":\"ZW1pdHRlcl9jaGFpbg==\",\"value\":\"Mg==\",\"index\":true},{\"key\":\"ZW1pdHRlcl9hZGRyZXNz\",\"value\":\"IjAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAyOTBmYjE2NzIwOGFmNDU1YmIxMzc3ODAxNjNiN2I3YTlhMTBjMTYi\",\"index\":true},{\"key\":\"c2VxdWVuY2U=\",\"value\":\"MTY3MjkzMjk5OA==\",\"index\":true},{\"key\":\"Y29uc2lzdGVuY3lfbGV2ZWw=\",\"value\":\"MTU=\",\"index\":true},{\"key\":\"dGVzdF9maWVsZA==\",\"value\":\"MTU=\",\"index\":true},{\"key\":\"cGF5bG9hZA==\",\"value\":\"IkFRQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUEzZ3RyT25aQUFBQUFBQUFBQUFBQUFBQUFBQUxZdm12d3VxZE9DcEJ3Rm1lY3JwR1E2QTNRb0FBZ0FBQUFBQUFBQUFBQUFBQU1FSUlKZy9NMFZzNTc2em9FYjFxRCtqVHdKOURDQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUE9PSI=\",\"index\":true}]}") + eventJson := []byte("{\"type\":\"wasm-Observation\",\"attributes\":[{\"key\":\"X2NvbnRyYWN0X2FkZHJlc3M=\",\"value\":\"d29ybWhvbGUxNDY2bmYzenV4cHlhOHE5ZW14dWtkN3ZmdGFmNmg0cHNyMGEwN3NybDV6dzc0emg4NHlqcTRseWptaA==\",\"index\":true},{\"key\":\"dHhfaGFzaA==\",\"value\":\"Imd1b2xOc1hSWnhnd3kwa1NENVJIbmpTMVJaYW8zVGFmdkNabVpucDJYMHM9Ig==\",\"index\":true},{\"key\":\"dGltZXN0YW1w\",\"value\":\"MTY3MjkzMjk5OA==\",\"index\":true},{\"key\":\"bm9uY2U=\",\"value\":\"MA==\",\"index\":true},{\"key\":\"ZW1pdHRlcl9jaGFpbg==\",\"value\":\"Mg==\",\"index\":true},{\"key\":\"ZW1pdHRlcl9hZGRyZXNz\",\"value\":\"IjAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAyOTBmYjE2NzIwOGFmNDU1YmIxMzc3ODAxNjNiN2I3YTlhMTBjMTYi\",\"index\":true},{\"key\":\"c2VxdWVuY2U=\",\"value\":\"MTY3MjkzMjk5OA==\",\"index\":true},{\"key\":\"Y29uc2lzdGVuY3lfbGV2ZWw=\",\"value\":\"MTU=\",\"index\":true},{\"key\":\"dGVzdF9maWVsZA==\",\"value\":\"MTU=\",\"index\":true},{\"key\":\"cGF5bG9hZA==\",\"value\":\"IkFRQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUEzZ3RyT25aQUFBQUFBQUFBQUFBQUFBQUFBQUxZdm12d3VxZE9DcEJ3Rm1lY3JwR1E2QTNRb0FBZ0FBQUFBQUFBQUFBQUFBQU1FSUlKZy9NMFZzNTc2em9FYjFxRCtqVHdKOURDQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUE9PSI=\",\"index\":true}]}") event := tmAbci.Event{} err := json.Unmarshal(eventJson, &event) require.NoError(t, err) - xfer, err := parseWasmTransfer(logger, event, "wormhole1466nf3zuxpya8q9emxukd7vftaf6h4psr0a07srl5zw74zh84yjq4lyjmh") + xfer, err := parseEvent[WasmObservation](logger, event, "wasm-Observation", "wormhole1466nf3zuxpya8q9emxukd7vftaf6h4psr0a07srl5zw74zh84yjq4lyjmh") require.NoError(t, err) require.NotNil(t, xfer) @@ -36,8 +36,8 @@ func TestParseWasmTransferFromTestTool(t *testing.T) { expectedPayload, err := hex.DecodeString("010000000000000000000000000000000000000000000000000de0b6b3a76400000000000000000000000000002d8be6bf0baa74e0a907016679cae9190e80dd0a0002000000000000000000000000c10820983f33456ce7beb3a046f5a83fa34f027d0c200000000000000000000000000000000000000000000000000000000000000000") require.NoError(t, err) - expectedResult := WasmTransfer{ - TxHashBytes: expectedTxHash.Bytes(), + expectedResult := WasmObservation{ + TxHash: expectedTxHash.Bytes(), Timestamp: 1672932998, Nonce: 0, EmitterChain: uint16(vaa.ChainIDEthereum), @@ -49,15 +49,15 @@ func TestParseWasmTransferFromTestTool(t *testing.T) { assert.Equal(t, expectedResult, *xfer) } -func TestParseWasmTransferFromPortalBridge(t *testing.T) { +func TestParseWasmObservationFromPortalBridge(t *testing.T) { logger := zap.NewNop() - eventJson := []byte("{\"type\":\"wasm-Transfer\",\"attributes\":[{\"key\":\"X2NvbnRyYWN0X2FkZHJlc3M=\",\"value\":\"d29ybWhvbGUxNDY2bmYzenV4cHlhOHE5ZW14dWtkN3ZmdGFmNmg0cHNyMGEwN3NybDV6dzc0emg4NHlqcTRseWptaA==\",\"index\":true},{\"key\":\"dHhfaGFzaA==\",\"value\":\"IlovM0x1bklSK0FaWjdRdllqS0dHSDBNZU94M1pIZlR1SHZ6TDAxdm9TcjQ9Ig==\",\"index\":true},{\"key\":\"dGltZXN0YW1w\",\"value\":\"OTUwNw==\",\"index\":true},{\"key\":\"bm9uY2U=\",\"value\":\"NTU0MzAzNzQ0\",\"index\":true},{\"key\":\"ZW1pdHRlcl9jaGFpbg==\",\"value\":\"Mg==\",\"index\":true},{\"key\":\"ZW1pdHRlcl9hZGRyZXNz\",\"value\":\"IjAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAyOTBmYjE2NzIwOGFmNDU1YmIxMzc3ODAxNjNiN2I3YTlhMTBjMTYi\",\"index\":true},{\"key\":\"c2VxdWVuY2U=\",\"value\":\"MQ==\",\"index\":true},{\"key\":\"Y29uc2lzdGVuY3lfbGV2ZWw=\",\"value\":\"MQ==\",\"index\":true},{\"key\":\"cGF5bG9hZA==\",\"value\":\"IkFRQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBSlVDK1FBQUFBQUFBQUFBQUFBQUFBQTNiWlA1R3FSMUc3aWxDQlRuOEpmMEh4ZjZqNEFBZ0FBQUFBQUFBQUFBQUFBQUpENHYycEhueklPclFkRUVhU3c1NVJPcU1uQkFBUUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUE9PSI=\",\"index\":true}]}") + eventJson := []byte("{\"type\":\"wasm-Observation\",\"attributes\":[{\"key\":\"X2NvbnRyYWN0X2FkZHJlc3M=\",\"value\":\"d29ybWhvbGUxNDY2bmYzenV4cHlhOHE5ZW14dWtkN3ZmdGFmNmg0cHNyMGEwN3NybDV6dzc0emg4NHlqcTRseWptaA==\",\"index\":true},{\"key\":\"dHhfaGFzaA==\",\"value\":\"IlovM0x1bklSK0FaWjdRdllqS0dHSDBNZU94M1pIZlR1SHZ6TDAxdm9TcjQ9Ig==\",\"index\":true},{\"key\":\"dGltZXN0YW1w\",\"value\":\"OTUwNw==\",\"index\":true},{\"key\":\"bm9uY2U=\",\"value\":\"NTU0MzAzNzQ0\",\"index\":true},{\"key\":\"ZW1pdHRlcl9jaGFpbg==\",\"value\":\"Mg==\",\"index\":true},{\"key\":\"ZW1pdHRlcl9hZGRyZXNz\",\"value\":\"IjAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAyOTBmYjE2NzIwOGFmNDU1YmIxMzc3ODAxNjNiN2I3YTlhMTBjMTYi\",\"index\":true},{\"key\":\"c2VxdWVuY2U=\",\"value\":\"MQ==\",\"index\":true},{\"key\":\"Y29uc2lzdGVuY3lfbGV2ZWw=\",\"value\":\"MQ==\",\"index\":true},{\"key\":\"cGF5bG9hZA==\",\"value\":\"IkFRQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBSlVDK1FBQUFBQUFBQUFBQUFBQUFBQTNiWlA1R3FSMUc3aWxDQlRuOEpmMEh4ZjZqNEFBZ0FBQUFBQUFBQUFBQUFBQUpENHYycEhueklPclFkRUVhU3c1NVJPcU1uQkFBUUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUE9PSI=\",\"index\":true}]}") event := tmAbci.Event{} err := json.Unmarshal(eventJson, &event) require.NoError(t, err) - xfer, err := parseWasmTransfer(logger, event, "wormhole1466nf3zuxpya8q9emxukd7vftaf6h4psr0a07srl5zw74zh84yjq4lyjmh") + xfer, err := parseEvent[WasmObservation](logger, event, "wasm-Observation", "wormhole1466nf3zuxpya8q9emxukd7vftaf6h4psr0a07srl5zw74zh84yjq4lyjmh") require.NoError(t, err) require.NotNil(t, xfer) @@ -70,8 +70,8 @@ func TestParseWasmTransferFromPortalBridge(t *testing.T) { expectedPayload, err := hex.DecodeString("0100000000000000000000000000000000000000000000000000000002540be400000000000000000000000000ddb64fe46a91d46ee29420539fc25fd07c5fea3e000200000000000000000000000090f8bf6a479f320ead074411a4b0e7944ea8c9c100040000000000000000000000000000000000000000000000000000000000000000") require.NoError(t, err) - expectedResult := WasmTransfer{ - TxHashBytes: expectedTxHash.Bytes(), + expectedResult := WasmObservation{ + TxHash: expectedTxHash.Bytes(), Timestamp: 9507, Nonce: 554303744, EmitterChain: uint16(vaa.ChainIDEthereum), @@ -83,3 +83,30 @@ func TestParseWasmTransferFromPortalBridge(t *testing.T) { assert.Equal(t, expectedResult, *xfer) } + +func TestParseWasmObservationError(t *testing.T) { + logger := zap.NewNop() + + eventJson := []byte("{\"type\":\"wasm-ObservationError\",\"attributes\":[{\"key\":\"X2NvbnRyYWN0X2FkZHJlc3M=\",\"value\":\"d29ybWhvbGUxNDY2bmYzenV4cHlhOHE5ZW14dWtkN3ZmdGFmNmg0cHNyMGEwN3NybDV6dzc0emg4NHlqcTRseWptaA==\",\"index\":true},{\"key\":\"a2V5\",\"value\":\"eyJlbWl0dGVyX2NoYWluIjoyLCJlbWl0dGVyX2FkZHJlc3MiOiIwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMjkwZmIxNjcyMDhhZjQ1NWJiMTM3NzgwMTYzYjdiN2E5YTEwYzE2Iiwic2VxdWVuY2UiOjE2NzQxNDQ1NDV9\",\"index\":true},{\"key\":\"ZXJyb3I=\",\"value\":\"ImRpZ2VzdCBtaXNtYXRjaCBmb3IgcHJvY2Vzc2VkIG1lc3NhZ2Ui\",\"index\":true}]}") + event := tmAbci.Event{} + err := json.Unmarshal(eventJson, &event) + require.NoError(t, err) + + evt, err := parseEvent[WasmObservationError](logger, event, "wasm-ObservationError", "wormhole1466nf3zuxpya8q9emxukd7vftaf6h4psr0a07srl5zw74zh84yjq4lyjmh") + require.NoError(t, err) + require.NotNil(t, evt) + + expectedEmitterAddress, err := vaa.StringToAddress("0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16") + require.NoError(t, err) + + expectedResult := WasmObservationError{ + Key: ObservationKey{ + EmitterChain: uint16(vaa.ChainIDEthereum), + EmitterAddress: expectedEmitterAddress, + Sequence: 1674144545, + }, + Error: "digest mismatch for processed message", + } + + assert.Equal(t, expectedResult, *evt) +} diff --git a/node/pkg/wormconn/clientconn.go b/node/pkg/wormconn/clientconn.go index 6b5890b2e..99c6dc179 100644 --- a/node/pkg/wormconn/clientconn.go +++ b/node/pkg/wormconn/clientconn.go @@ -64,13 +64,17 @@ func (c *ClientConn) Close() { c.c.Close() } -func (c *ClientConn) BroadcastTxResponseToString(txResp *sdktx.BroadcastTxResponse) (string, error) { - out, err := c.encCfg.Marshaler.MarshalJSON(txResp) - if err != nil { - return "", err +func (c *ClientConn) BroadcastTxResponseToString(txResp *sdktx.BroadcastTxResponse) string { + if txResp == nil { + return "txResp is nil" } - return string(out), nil + out, err := c.encCfg.Marshaler.MarshalJSON(txResp) + if err != nil { + panic(fmt.Sprintf("failed to format txResp: %s", err)) + } + + return string(out) } // generateSenderAddress creates the sender address from the private key. It is based on https://pkg.go.dev/github.com/btcsuite/btcutil/bech32#Encode