2023-01-17 05:30:50 -08:00
package accountant
2023-01-16 04:33:01 -08:00
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"encoding/json"
2023-01-20 06:15:36 -08:00
"errors"
2023-01-16 04:33:01 -08:00
"fmt"
"strings"
2023-01-20 06:15:36 -08:00
"time"
2023-01-16 04:33:01 -08:00
"github.com/certusone/wormhole/node/pkg/common"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
ethCrypto "github.com/ethereum/go-ethereum/crypto"
wasmdtypes "github.com/CosmWasm/wasmd/x/wasm/types"
sdktypes "github.com/cosmos/cosmos-sdk/types"
sdktx "github.com/cosmos/cosmos-sdk/types/tx"
"go.uber.org/zap"
)
2023-01-20 06:15:36 -08:00
// TODO: Arbitrary values. What makes sense?
const batchSize = 10
const delayInMS = 100 * time . Millisecond
// worker listens for observation requests from the accountant and submits them to the smart contract.
2023-01-17 05:30:50 -08:00
func ( acct * Accountant ) worker ( ctx context . Context ) error {
2023-01-16 04:33:01 -08:00
for {
select {
case <- ctx . Done ( ) :
return nil
2023-01-20 06:15:36 -08:00
default :
if err := acct . handleBatch ( ctx ) ; err != nil {
return err
2023-01-16 04:33:01 -08:00
}
2023-01-20 06:15:36 -08:00
}
}
}
2023-01-16 04:33:01 -08:00
2023-01-20 06:15:36 -08:00
// handleBatch reads a batch of events from the channel, either until a timeout occurs or the batch is full,
// and submits them to the smart contract.
func ( acct * Accountant ) handleBatch ( ctx context . Context ) error {
ctx , cancel := context . WithTimeout ( ctx , delayInMS )
defer cancel ( )
2023-01-16 04:33:01 -08:00
2023-01-20 06:15:36 -08:00
msgs , err := readFromChannel [ * common . MessagePublication ] ( ctx , acct . subChan , batchSize )
if err != nil && ! errors . Is ( err , context . DeadlineExceeded ) {
return fmt . Errorf ( "failed to read messages from `acct.subChan`: %w" , err )
}
2023-01-25 11:02:12 -08:00
if len ( msgs ) != 0 {
msgs = acct . removeCompleted ( msgs )
}
2023-01-24 13:03:18 -08:00
2023-01-20 06:15:36 -08:00
if len ( msgs ) == 0 {
return nil
}
gs := acct . gst . Get ( )
if gs == nil {
2023-01-24 13:03:18 -08:00
return fmt . Errorf ( "failed to get guardian set" )
2023-01-20 06:15:36 -08:00
}
guardianIndex , found := gs . KeyIndex ( acct . guardianAddr )
if ! found {
return fmt . Errorf ( "failed to get guardian index" )
}
acct . submitObservationsToContract ( msgs , gs . Index , uint32 ( guardianIndex ) )
transfersSubmitted . Add ( float64 ( len ( msgs ) ) )
return nil
}
// readFromChannel reads events from the channel until a timeout occurs or the batch is full, and returns them.
func readFromChannel [ T any ] ( ctx context . Context , ch <- chan T , count int ) ( [ ] T , error ) {
out := make ( [ ] T , 0 , count )
for len ( out ) < count {
select {
case <- ctx . Done ( ) :
return out , ctx . Err ( )
case msg := <- ch :
out = append ( out , msg )
2023-01-16 04:33:01 -08:00
}
}
2023-01-20 06:15:36 -08:00
return out , nil
2023-01-16 04:33:01 -08:00
}
2023-01-24 13:03:18 -08:00
// removeCompleted drops any messages that are no longer in the pending transfer map. This is to handle the case where the contract reports
// that a transfer is committed while it is in the channel. There is no point in submitting the observation once the transfer is committed.
func ( acct * Accountant ) removeCompleted ( msgs [ ] * common . MessagePublication ) [ ] * common . MessagePublication {
2023-01-25 11:02:12 -08:00
acct . pendingTransfersLock . Lock ( )
defer acct . pendingTransfersLock . Unlock ( )
2023-01-24 13:03:18 -08:00
out := make ( [ ] * common . MessagePublication , 0 , len ( msgs ) )
for _ , msg := range msgs {
if _ , exists := acct . pendingTransfers [ msg . MessageIDString ( ) ] ; exists {
out = append ( out , msg )
}
}
return out
}
2023-01-16 04:33:01 -08:00
type (
2023-01-24 13:03:18 -08:00
TransferKey struct {
EmitterChain uint16 ` json:"emitter_chain" `
EmitterAddress vaa . Address ` json:"emitter_address" `
Sequence uint64 ` json:"sequence" `
}
2023-01-16 04:33:01 -08:00
SubmitObservationsMsg struct {
Params SubmitObservationsParams ` json:"submit_observations" `
}
SubmitObservationsParams struct {
// A serialized `Vec<Observation>`. Multiple observations can be submitted together to reduce transaction overhead.
Observations [ ] byte ` json:"observations" `
// The index of the guardian set used to sign the observations.
GuardianSetIndex uint32 ` json:"guardian_set_index" `
// A signature for `observations`.
Signature SignatureType ` json:"signature" `
}
SignatureType struct {
Index uint32 ` json:"index" `
Signature SignatureBytes ` json:"signature" `
}
SignatureBytes [ ] uint8
Observation struct {
// The hash of the transaction on the emitter chain in which the transfer was performed.
TxHash [ ] byte ` json:"tx_hash" `
// Seconds since UNIX epoch.
Timestamp uint32 ` json:"timestamp" `
// The nonce for the transfer.
Nonce uint32 ` json:"nonce" `
// The source chain from which this observation was created.
EmitterChain uint16 ` json:"emitter_chain" `
// The address on the source chain that emitted this message.
2023-01-20 06:15:36 -08:00
EmitterAddress vaa . Address ` json:"emitter_address" `
2023-01-16 04:33:01 -08:00
// The sequence number of this observation.
Sequence uint64 ` json:"sequence" `
// The consistency level requested by the emitter.
ConsistencyLevel uint8 ` json:"consistency_level" `
// The serialized tokenbridge payload.
Payload [ ] byte ` json:"payload" `
}
2023-01-20 06:15:36 -08:00
// These are used to parse the response data
ObservationResponses [ ] ObservationResponse
ObservationResponse struct {
2023-01-24 13:03:18 -08:00
Key TransferKey
2023-01-20 06:15:36 -08:00
Status ObservationResponseStatus
}
ObservationResponseStatus struct {
Type string ` json:"type" `
Data string ` json:"data" `
}
2023-01-16 04:33:01 -08:00
)
2023-01-30 14:43:10 -08:00
var submitObservationPrefix = [ ] byte ( "acct_sub_obsfig_000000000000000000|" )
2023-01-24 13:03:18 -08:00
func ( k TransferKey ) String ( ) string {
2023-01-20 06:15:36 -08:00
return fmt . Sprintf ( "%v/%v/%v" , k . EmitterChain , hex . EncodeToString ( k . EmitterAddress [ : ] ) , k . Sequence )
}
2023-01-16 04:33:01 -08:00
func ( sb SignatureBytes ) MarshalJSON ( ) ( [ ] byte , error ) {
var result string
if sb == nil {
result = "null"
} else {
result = strings . Join ( strings . Fields ( fmt . Sprintf ( "%d" , sb ) ) , "," )
}
return [ ] byte ( result ) , nil
}
2023-01-20 06:15:36 -08:00
// submitObservationsToContract makes a call to the smart contract to submit a batch of observation requests.
2023-01-16 04:33:01 -08:00
// It should be called from a go routine because it can block.
2023-01-20 06:15:36 -08:00
func ( acct * Accountant ) submitObservationsToContract ( msgs [ ] * common . MessagePublication , gsIndex uint32 , guardianIndex uint32 ) {
txResp , err := SubmitObservationsToContract ( acct . ctx , acct . logger , acct . gk , gsIndex , guardianIndex , acct . wormchainConn , acct . contract , msgs )
2023-01-16 04:33:01 -08:00
if err != nil {
2023-01-20 06:15:36 -08:00
// This means the whole batch failed. They will all get retried the next audit cycle.
2023-04-06 11:32:00 -07:00
acct . logger . Error ( "failed to submit any observations in batch" , zap . Int ( "numMsgs" , len ( msgs ) ) , zap . Error ( err ) )
2023-01-20 06:15:36 -08:00
for idx , msg := range msgs {
2023-04-06 11:32:00 -07:00
acct . logger . Error ( "failed to submit observation" , zap . Int ( "idx" , idx ) , zap . String ( "msgId" , msg . MessageIDString ( ) ) )
2023-01-20 06:15:36 -08:00
}
submitFailures . Add ( float64 ( len ( msgs ) ) )
2023-01-25 11:02:12 -08:00
acct . clearSubmitPendingFlags ( msgs )
2023-01-16 04:33:01 -08:00
return
}
2023-01-20 06:15:36 -08:00
responses , err := GetObservationResponses ( txResp )
2023-01-16 04:33:01 -08:00
if err != nil {
2023-01-20 06:15:36 -08:00
// This means the whole batch failed. They will all get retried the next audit cycle.
2023-04-06 11:32:00 -07:00
acct . logger . Error ( "failed to get responses from batch" , zap . Error ( err ) , zap . String ( "txResp" , acct . wormchainConn . BroadcastTxResponseToString ( txResp ) ) )
2023-01-20 06:15:36 -08:00
for idx , msg := range msgs {
2023-04-06 11:32:00 -07:00
acct . logger . Error ( "need to retry observation" , zap . Int ( "idx" , idx ) , zap . String ( "msgId" , msg . MessageIDString ( ) ) )
2023-01-16 04:33:01 -08:00
}
2023-01-20 06:15:36 -08:00
submitFailures . Add ( float64 ( len ( msgs ) ) )
2023-01-25 11:02:12 -08:00
acct . clearSubmitPendingFlags ( msgs )
2023-01-16 04:33:01 -08:00
return
}
2023-01-20 06:15:36 -08:00
if len ( responses ) != len ( msgs ) {
// This means the whole batch failed. They will all get retried the next audit cycle.
2023-04-06 11:32:00 -07:00
acct . logger . Error ( "number of responses does not match number of messages" , zap . Int ( "numMsgs" , len ( msgs ) ) , zap . Int ( "numResp" , len ( responses ) ) , zap . Error ( err ) )
2023-01-20 06:15:36 -08:00
for idx , msg := range msgs {
2023-04-06 11:32:00 -07:00
acct . logger . Error ( "need to retry observation" , zap . Int ( "idx" , idx ) , zap . String ( "msgId" , msg . MessageIDString ( ) ) )
2023-01-16 04:33:01 -08:00
}
2023-01-20 06:15:36 -08:00
submitFailures . Add ( float64 ( len ( msgs ) ) )
2023-01-25 11:02:12 -08:00
acct . clearSubmitPendingFlags ( msgs )
2023-01-20 06:15:36 -08:00
return
}
for _ , msg := range msgs {
msgId := msg . MessageIDString ( )
status , exists := responses [ msgId ]
if ! exists {
// This will get retried next audit interval.
2023-04-06 11:32:00 -07:00
acct . logger . Error ( "did not receive an observation response for message" , zap . String ( "msgId" , msgId ) )
2023-01-20 06:15:36 -08:00
submitFailures . Inc ( )
continue
}
switch status . Type {
case "pending" :
2023-04-06 11:32:00 -07:00
acct . logger . Info ( "transfer is pending" , zap . String ( "msgId" , msgId ) )
2023-01-20 06:15:36 -08:00
case "committed" :
acct . handleCommittedTransfer ( msgId )
case "error" :
submitFailures . Inc ( )
2023-04-06 11:32:00 -07:00
acct . handleTransferError ( msgId , status . Data , "transfer failed" )
2023-01-20 06:15:36 -08:00
default :
// This will get retried next audit interval.
2023-04-06 11:32:00 -07:00
acct . logger . Error ( "unexpected status response on observation" , zap . String ( "msgId" , msgId ) , zap . String ( "status" , status . Type ) , zap . String ( "text" , status . Data ) )
2023-01-20 06:15:36 -08:00
submitFailures . Inc ( )
}
}
2023-01-25 11:02:12 -08:00
acct . clearSubmitPendingFlags ( msgs )
2023-01-20 06:15:36 -08:00
}
// handleCommittedTransfer updates the pending map and publishes a committed transfer. It grabs the lock.
func ( acct * Accountant ) handleCommittedTransfer ( msgId string ) {
acct . pendingTransfersLock . Lock ( )
defer acct . pendingTransfersLock . Unlock ( )
pe , exists := acct . pendingTransfers [ msgId ]
if exists {
2023-04-06 11:32:00 -07:00
acct . logger . Info ( "transfer has been committed, publishing it" , zap . String ( "msgId" , msgId ) )
2023-01-20 06:15:36 -08:00
acct . publishTransferAlreadyLocked ( pe )
transfersApproved . Inc ( )
} else {
2023-04-06 11:32:00 -07:00
acct . logger . Debug ( "transfer has been committed but it is no longer in our map" , zap . String ( "msgId" , msgId ) )
2023-01-20 06:15:36 -08:00
}
}
// handleTransferError is called when a transfer fails, either from a submit or an event notification. It handles insufficient balance error. It grabs the lock.
func ( acct * Accountant ) handleTransferError ( msgId string , errText string , logText string ) {
if strings . Contains ( errText , "insufficient balance" ) {
balanceErrors . Inc ( )
2023-04-06 11:32:00 -07:00
acct . logger . Error ( "insufficient balance error detected, dropping transfer" , zap . String ( "msgId" , msgId ) , zap . String ( "text" , errText ) )
2023-01-20 06:15:36 -08:00
acct . deletePendingTransfer ( msgId )
} else {
// This will get retried next audit interval.
acct . logger . Error ( logText , zap . String ( "msgId" , msgId ) , zap . String ( "text" , errText ) )
2023-01-16 04:33:01 -08:00
}
}
2023-01-20 06:15:36 -08:00
// SubmitObservationsToContract is a free function to make a call to the smart contract to submit an observation request.
// If the submit fails or the result contains an error, it will return the error. If an error is returned, the caller is
// expected to use GetFailedIndexInBatch() to see which observation in the batch failed.
func SubmitObservationsToContract (
2023-01-16 04:33:01 -08:00
ctx context . Context ,
logger * zap . Logger ,
gk * ecdsa . PrivateKey ,
gsIndex uint32 ,
guardianIndex uint32 ,
2023-05-04 07:57:30 -07:00
wormchainConn AccountantWormchainConn ,
2023-01-16 04:33:01 -08:00
contract string ,
2023-01-20 06:15:36 -08:00
msgs [ ] * common . MessagePublication ,
2023-01-16 04:33:01 -08:00
) ( * sdktx . BroadcastTxResponse , error ) {
2023-01-20 06:15:36 -08:00
obs := make ( [ ] Observation , len ( msgs ) )
for idx , msg := range msgs {
obs [ idx ] = Observation {
2023-01-16 04:33:01 -08:00
TxHash : msg . TxHash . Bytes ( ) ,
Timestamp : uint32 ( msg . Timestamp . Unix ( ) ) ,
Nonce : msg . Nonce ,
EmitterChain : uint16 ( msg . EmitterChain ) ,
EmitterAddress : msg . EmitterAddress ,
Sequence : msg . Sequence ,
ConsistencyLevel : msg . ConsistencyLevel ,
Payload : msg . Payload ,
2023-01-20 06:15:36 -08:00
}
2023-04-06 11:32:00 -07:00
logger . Debug ( "in SubmitObservationsToContract, encoding observation" ,
2023-01-20 06:15:36 -08:00
zap . Int ( "idx" , idx ) ,
zap . String ( "txHash" , msg . TxHash . String ( ) ) , zap . String ( "encTxHash" , hex . EncodeToString ( obs [ idx ] . TxHash [ : ] ) ) ,
zap . Stringer ( "timeStamp" , msg . Timestamp ) , zap . Uint32 ( "encTimestamp" , obs [ idx ] . Timestamp ) ,
zap . Uint32 ( "nonce" , msg . Nonce ) , zap . Uint32 ( "encNonce" , obs [ idx ] . Nonce ) ,
zap . Stringer ( "emitterChain" , msg . EmitterChain ) , zap . Uint16 ( "encEmitterChain" , obs [ idx ] . EmitterChain ) ,
zap . Stringer ( "emitterAddress" , msg . EmitterAddress ) , zap . String ( "encEmitterAddress" , hex . EncodeToString ( obs [ idx ] . EmitterAddress [ : ] ) ) ,
zap . Uint64 ( "squence" , msg . Sequence ) , zap . Uint64 ( "encSequence" , obs [ idx ] . Sequence ) ,
zap . Uint8 ( "consistencyLevel" , msg . ConsistencyLevel ) , zap . Uint8 ( "encConsistencyLevel" , obs [ idx ] . ConsistencyLevel ) ,
zap . String ( "payload" , hex . EncodeToString ( msg . Payload ) ) , zap . String ( "encPayload" , hex . EncodeToString ( obs [ idx ] . Payload ) ) ,
)
2023-01-16 04:33:01 -08:00
}
bytes , err := json . Marshal ( obs )
if err != nil {
2023-04-06 11:32:00 -07:00
return nil , fmt . Errorf ( "failed to marshal accountant observation request: %w" , err )
2023-01-16 04:33:01 -08:00
}
2023-02-22 18:31:15 -08:00
digest , err := vaa . MessageSigningDigest ( submitObservationPrefix , bytes )
if err != nil {
2023-04-06 11:32:00 -07:00
return nil , fmt . Errorf ( "failed to sign accountant Observation request: %w" , err )
2023-02-22 18:31:15 -08:00
}
2023-01-16 04:33:01 -08:00
sigBytes , err := ethCrypto . Sign ( digest . Bytes ( ) , gk )
if err != nil {
2023-04-06 11:32:00 -07:00
return nil , fmt . Errorf ( "failed to sign accountant Observation request: %w" , err )
2023-01-16 04:33:01 -08:00
}
sig := SignatureType { Index : guardianIndex , Signature : sigBytes }
msgData := SubmitObservationsMsg {
Params : SubmitObservationsParams {
Observations : bytes ,
GuardianSetIndex : gsIndex ,
Signature : sig ,
} ,
}
msgBytes , err := json . Marshal ( msgData )
if err != nil {
2023-04-06 11:32:00 -07:00
return nil , fmt . Errorf ( "failed to marshal accountant observation request: %w" , err )
2023-01-16 04:33:01 -08:00
}
subMsg := wasmdtypes . MsgExecuteContract {
Sender : wormchainConn . SenderAddress ( ) ,
Contract : contract ,
Msg : msgBytes ,
Funds : sdktypes . Coins { } ,
}
2023-04-06 11:32:00 -07:00
logger . Debug ( "in SubmitObservationsToContract, sending broadcast" ,
2023-01-20 06:15:36 -08:00
zap . Int ( "numObs" , len ( obs ) ) ,
2023-01-16 04:33:01 -08:00
zap . String ( "observations" , string ( bytes ) ) ,
zap . Uint32 ( "gsIndex" , gsIndex ) , zap . Uint32 ( "guardianIndex" , guardianIndex ) ,
)
2023-01-24 13:03:18 -08:00
start := time . Now ( )
2023-01-16 04:33:01 -08:00
txResp , err := wormchainConn . SignAndBroadcastTx ( ctx , & subMsg )
if err != nil {
2023-01-20 06:15:36 -08:00
return txResp , fmt . Errorf ( "failed to send broadcast: %w" , err )
2023-01-16 04:33:01 -08:00
}
if txResp == nil {
2023-01-20 06:15:36 -08:00
return txResp , fmt . Errorf ( "sent broadcast but returned txResp is nil" )
2023-01-16 04:33:01 -08:00
}
2023-01-20 06:15:36 -08:00
2023-01-16 04:33:01 -08:00
if txResp . TxResponse == nil {
2023-01-20 06:15:36 -08:00
return txResp , fmt . Errorf ( "sent broadcast but returned txResp.TxResponse is nil" )
2023-01-16 04:33:01 -08:00
}
2023-01-20 06:15:36 -08:00
2023-01-16 04:33:01 -08:00
if txResp . TxResponse . RawLog == "" {
2023-01-20 06:15:36 -08:00
return txResp , fmt . Errorf ( "sent broadcast but raw_log is not set, unable to analyze the result" )
2023-01-16 04:33:01 -08:00
}
2023-01-20 06:15:36 -08:00
if strings . Contains ( txResp . TxResponse . RawLog , "out of gas" ) {
return txResp , fmt . Errorf ( "out of gas: %s" , txResp . TxResponse . RawLog )
}
2023-01-16 04:33:01 -08:00
2023-01-20 06:15:36 -08:00
if strings . Contains ( txResp . TxResponse . RawLog , "failed to execute message" ) {
return txResp , fmt . Errorf ( "failed to submit observations: %s" , txResp . TxResponse . RawLog )
}
2023-04-06 11:32:00 -07:00
logger . Info ( "done sending broadcast" , zap . Int ( "numObs" , len ( obs ) ) , zap . Int64 ( "gasUsed" , txResp . TxResponse . GasUsed ) , zap . Stringer ( "elapsedTime" , time . Since ( start ) ) )
logger . Debug ( "in SubmitObservationsToContract, done sending broadcast" , zap . String ( "resp" , wormchainConn . BroadcastTxResponseToString ( txResp ) ) )
2023-01-20 06:15:36 -08:00
return txResp , nil
}
2023-01-16 04:33:01 -08:00
2023-01-20 06:15:36 -08:00
// GetObservationResponses is a free function that extracts the observation responses from a transaction response.
// It assumes the transaction response is valid (SubmitObservationsToContract() did not return an error).
func GetObservationResponses ( txResp * sdktx . BroadcastTxResponse ) ( map [ string ] ObservationResponseStatus , error ) {
data , err := hex . DecodeString ( txResp . TxResponse . Data )
if err != nil {
return nil , fmt . Errorf ( "failed to decode data: %w" , err )
2023-01-16 04:33:01 -08:00
}
2023-01-20 06:15:36 -08:00
var msg sdktypes . TxMsgData
2023-05-03 15:27:47 -07:00
if err := msg . Unmarshal ( data ) ; err != nil {
2023-01-20 06:15:36 -08:00
return nil , fmt . Errorf ( "failed to unmarshal data: %w" , err )
}
2023-01-28 04:56:43 -08:00
if len ( msg . Data ) == 0 {
return nil , fmt . Errorf ( "data field is empty" )
}
2023-01-20 06:15:36 -08:00
var execContractResp wasmdtypes . MsgExecuteContractResponse
if err := execContractResp . Unmarshal ( msg . Data [ 0 ] . Data ) ; err != nil {
return nil , fmt . Errorf ( "failed to unmarshal ExecuteContractResponse: %w" , err )
}
var responses ObservationResponses
err = json . Unmarshal ( execContractResp . Data , & responses )
if err != nil {
return nil , fmt . Errorf ( "failed to unmarshal responses: %w" , err )
}
out := make ( map [ string ] ObservationResponseStatus )
for _ , resp := range responses {
out [ resp . Key . String ( ) ] = resp . Status
2023-01-16 04:33:01 -08:00
}
2023-01-20 06:15:36 -08:00
return out , nil
2023-01-16 04:33:01 -08:00
}