diff --git a/node/pkg/common/chainlock.go b/node/pkg/common/chainlock.go index b97b23d09..052d1bbd5 100644 --- a/node/pkg/common/chainlock.go +++ b/node/pkg/common/chainlock.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "encoding/hex" "encoding/json" + "errors" "fmt" "time" @@ -14,6 +15,9 @@ import ( "github.com/ethereum/go-ethereum/common" ) +const HashLength = 32 +const AddressLength = 32 + type MessagePublication struct { TxHash common.Hash // TODO: rename to identifier? on Solana, this isn't actually the tx hash Timestamp time.Time @@ -24,6 +28,7 @@ type MessagePublication struct { EmitterChain vaa.ChainID EmitterAddress vaa.Address Payload []byte + IsReobservation bool // Unreliable indicates if this message can be reobserved. If a message is considered unreliable it cannot be // reobserved. @@ -38,7 +43,7 @@ func (msg *MessagePublication) MessageIDString() string { return fmt.Sprintf("%v/%v/%v", uint16(msg.EmitterChain), msg.EmitterAddress, msg.Sequence) } -const minMsgLength = 88 +const minMsgLength = 88 // Marshalled length with empty payload func (msg *MessagePublication) Marshal() ([]byte, error) { buf := new(bytes.Buffer) @@ -50,15 +55,19 @@ func (msg *MessagePublication) Marshal() ([]byte, error) { vaa.MustWrite(buf, binary.BigEndian, msg.ConsistencyLevel) vaa.MustWrite(buf, binary.BigEndian, msg.EmitterChain) buf.Write(msg.EmitterAddress[:]) + vaa.MustWrite(buf, binary.BigEndian, msg.IsReobservation) buf.Write(msg.Payload) return buf.Bytes(), nil } -// Unmarshal deserializes the binary representation of a VAA -func UnmarshalMessagePublication(data []byte) (*MessagePublication, error) { - if len(data) < minMsgLength { - return nil, fmt.Errorf("message is too short") +const oldMinMsgLength = 83 // Old marshalled length with empty payload + +// UnmarshalOldMessagePublicationBeforeIsReobservation deserializes a MessagePublication from prior to the addition of IsReobservation. +// This function can be deleted once all guardians have been upgraded. That's why the code is just duplicated. +func UnmarshalOldMessagePublicationBeforeIsReobservation(data []byte) (*MessagePublication, error) { + if len(data) < oldMinMsgLength { + return nil, errors.New("message is too short") } msg := &MessagePublication{} @@ -66,7 +75,7 @@ func UnmarshalMessagePublication(data []byte) (*MessagePublication, error) { reader := bytes.NewReader(data[:]) txHash := common.Hash{} - if n, err := reader.Read(txHash[:]); err != nil || n != 32 { + if n, err := reader.Read(txHash[:]); err != nil || n != HashLength { return nil, fmt.Errorf("failed to read TxHash [%d]: %w", n, err) } msg.TxHash = txHash @@ -94,7 +103,7 @@ func UnmarshalMessagePublication(data []byte) (*MessagePublication, error) { } emitterAddress := vaa.Address{} - if n, err := reader.Read(emitterAddress[:]); err != nil || n != 32 { + if n, err := reader.Read(emitterAddress[:]); err != nil || n != AddressLength { return nil, fmt.Errorf("failed to read emitter address [%d]: %w", n, err) } msg.EmitterAddress = emitterAddress @@ -109,6 +118,64 @@ func UnmarshalMessagePublication(data []byte) (*MessagePublication, error) { return msg, nil } +// UnmarshalMessagePublication deserializes a MessagePublication +func UnmarshalMessagePublication(data []byte) (*MessagePublication, error) { + if len(data) < minMsgLength { + return nil, fmt.Errorf("message is too short") + } + + msg := &MessagePublication{} + + reader := bytes.NewReader(data[:]) + + txHash := common.Hash{} + if n, err := reader.Read(txHash[:]); err != nil || n != HashLength { + return nil, fmt.Errorf("failed to read TxHash [%d]: %w", n, err) + } + msg.TxHash = txHash + + unixSeconds := uint32(0) + if err := binary.Read(reader, binary.BigEndian, &unixSeconds); err != nil { + return nil, fmt.Errorf("failed to read timestamp: %w", err) + } + msg.Timestamp = time.Unix(int64(unixSeconds), 0) + + if err := binary.Read(reader, binary.BigEndian, &msg.Nonce); err != nil { + return nil, fmt.Errorf("failed to read nonce: %w", err) + } + + if err := binary.Read(reader, binary.BigEndian, &msg.Sequence); err != nil { + return nil, fmt.Errorf("failed to read sequence: %w", err) + } + + if err := binary.Read(reader, binary.BigEndian, &msg.ConsistencyLevel); err != nil { + return nil, fmt.Errorf("failed to read consistency level: %w", err) + } + + if err := binary.Read(reader, binary.BigEndian, &msg.EmitterChain); err != nil { + return nil, fmt.Errorf("failed to read emitter chain: %w", err) + } + + emitterAddress := vaa.Address{} + if n, err := reader.Read(emitterAddress[:]); err != nil || n != AddressLength { + return nil, fmt.Errorf("failed to read emitter address [%d]: %w", n, err) + } + msg.EmitterAddress = emitterAddress + + if err := binary.Read(reader, binary.BigEndian, &msg.IsReobservation); err != nil { + return nil, fmt.Errorf("failed to read isReobservation: %w", err) + } + + payload := make([]byte, reader.Len()) + n, err := reader.Read(payload) + if err != nil || n == 0 { + return nil, fmt.Errorf("failed to read payload [%d]: %w", n, err) + } + msg.Payload = payload[:n] + + return msg, nil +} + // The standard json Marshal / Unmarshal of time.Time gets confused between local and UTC time. func (msg *MessagePublication) MarshalJSON() ([]byte, error) { type Alias MessagePublication diff --git a/node/pkg/db/accountant.go b/node/pkg/db/accountant.go index 977dd296b..458def819 100644 --- a/node/pkg/db/accountant.go +++ b/node/pkg/db/accountant.go @@ -31,11 +31,22 @@ func (d *MockAccountantDB) AcctGetData(logger *zap.Logger) ([]*common.MessagePub return nil, nil } -const acctPendingTransfer = "ACCT:PXFER:" +const acctOldPendingTransfer = "ACCT:PXFER:" +const acctOldPendingTransferLen = len(acctOldPendingTransfer) + +const acctPendingTransfer = "ACCT:PXFER2:" const acctPendingTransferLen = len(acctPendingTransfer) const acctMinMsgIdLen = len("1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0") +func acctOldPendingTransferMsgID(msgId string) []byte { + return []byte(fmt.Sprintf("%v%v", acctOldPendingTransfer, msgId)) +} + +func acctIsOldPendingTransfer(keyBytes []byte) bool { + return (len(keyBytes) >= acctOldPendingTransferLen+acctMinMsgIdLen) && (string(keyBytes[0:acctOldPendingTransferLen]) == acctOldPendingTransfer) +} + func acctPendingTransferMsgID(msgId string) []byte { return []byte(fmt.Sprintf("%v%v", acctPendingTransfer, msgId)) } @@ -47,36 +58,92 @@ func acctIsPendingTransfer(keyBytes []byte) bool { // This is called by the accountant on start up to reload pending transfers. func (d *Database) AcctGetData(logger *zap.Logger) ([]*common.MessagePublication, error) { pendingTransfers := []*common.MessagePublication{} - prefixBytes := []byte(acctPendingTransfer) - err := d.db.View(func(txn *badger.Txn) error { - opts := badger.DefaultIteratorOptions - opts.PrefetchSize = 10 - it := txn.NewIterator(opts) - defer it.Close() - for it.Seek(prefixBytes); it.ValidForPrefix(prefixBytes); it.Next() { - item := it.Item() - key := item.Key() - val, err := item.ValueCopy(nil) - if err != nil { - return err - } - - if acctIsPendingTransfer(key) { - var pt common.MessagePublication - err := json.Unmarshal(val, &pt) + var err error + { + prefixBytes := []byte(acctPendingTransfer) + err = d.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchSize = 10 + it := txn.NewIterator(opts) + defer it.Close() + for it.Seek(prefixBytes); it.ValidForPrefix(prefixBytes); it.Next() { + item := it.Item() + key := item.Key() + val, err := item.ValueCopy(nil) if err != nil { - logger.Error("failed to unmarshal pending transfer for key", zap.String("key", string(key[:])), zap.Error(err)) - continue + return err } - pendingTransfers = append(pendingTransfers, &pt) - } else { - return fmt.Errorf("unexpected accountant pending transfer key '%s'", string(key)) + if acctIsPendingTransfer(key) { + var pt common.MessagePublication + err := json.Unmarshal(val, &pt) + if err != nil { + logger.Error("failed to unmarshal pending transfer for key", zap.String("key", string(key[:])), zap.Error(err)) + continue + } + + pendingTransfers = append(pendingTransfers, &pt) + } else { + return fmt.Errorf("unexpected accountant pending transfer key '%s'", string(key)) + } + } + + return nil + }) + } + + // See if we have any old format pending transfers. + if err == nil { + oldPendingTransfers := []*common.MessagePublication{} + prefixBytes := []byte(acctOldPendingTransfer) + err = d.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchSize = 10 + it := txn.NewIterator(opts) + defer it.Close() + for it.Seek(prefixBytes); it.ValidForPrefix(prefixBytes); it.Next() { + item := it.Item() + key := item.Key() + val, err := item.ValueCopy(nil) + if err != nil { + return err + } + + if acctIsOldPendingTransfer(key) { + pt, err := common.UnmarshalOldMessagePublicationBeforeIsReobservation(val) + if err != nil { + logger.Error("failed to unmarshal old pending transfer for key", zap.String("key", string(key[:])), zap.Error(err)) + continue + } + + oldPendingTransfers = append(oldPendingTransfers, pt) + } else { + return fmt.Errorf("unexpected accountant pending transfer key '%s'", string(key)) + } + } + + return nil + }) + + if err == nil && len(oldPendingTransfers) != 0 { + pendingTransfers = append(pendingTransfers, oldPendingTransfers...) + for _, pt := range oldPendingTransfers { + logger.Info("updating format of database entry for pending vaa", zap.String("msgId", pt.MessageIDString())) + err := d.AcctStorePendingTransfer(pt) + if err != nil { + return pendingTransfers, fmt.Errorf("failed to write new pending msg for key [%v]: %w", pt.MessageIDString(), err) + } + + key := acctOldPendingTransferMsgID(pt.MessageIDString()) + if err := d.db.Update(func(txn *badger.Txn) error { + err := txn.Delete(key) + return err + }); err != nil { + return pendingTransfers, fmt.Errorf("failed to delete old pending msg for key [%v]: %w", pt.MessageIDString(), err) + } } } - - return nil - }) + } return pendingTransfers, err } diff --git a/node/pkg/db/accountant_test.go b/node/pkg/db/accountant_test.go index 301f35f21..868a9c648 100644 --- a/node/pkg/db/accountant_test.go +++ b/node/pkg/db/accountant_test.go @@ -1,6 +1,10 @@ package db import ( + "bytes" + "encoding/binary" + "os" + "sort" "testing" "time" @@ -30,19 +34,22 @@ func TestAcctPendingTransferMsgID(t *testing.T) { ConsistencyLevel: 16, } - assert.Equal(t, []byte("ACCT:PXFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), acctPendingTransferMsgID(msg1.MessageIDString())) + assert.Equal(t, []byte("ACCT:PXFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), acctOldPendingTransferMsgID(msg1.MessageIDString())) + assert.Equal(t, []byte("ACCT:PXFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), acctPendingTransferMsgID(msg1.MessageIDString())) } func TestAcctIsPendingTransfer(t *testing.T) { - assert.Equal(t, true, acctIsPendingTransfer([]byte("ACCT:PXFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) - assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER:"))) - assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER:1"))) - assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER:1/1/1"))) - assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/"))) - assert.Equal(t, true, acctIsPendingTransfer([]byte("ACCT:PXFER:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0"))) + assert.Equal(t, true, acctIsPendingTransfer([]byte("ACCT:PXFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) + assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER2:"))) + assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER2:1"))) + assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER2:1/1/1"))) + assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER2:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/"))) + assert.Equal(t, true, acctIsPendingTransfer([]byte("ACCT:PXFER2:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0"))) assert.Equal(t, false, acctIsPendingTransfer([]byte("GOV:PENDING:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) assert.Equal(t, false, acctIsPendingTransfer([]byte{0x01, 0x02, 0x03, 0x04})) assert.Equal(t, false, acctIsPendingTransfer([]byte{})) + assert.Equal(t, true, acctIsOldPendingTransfer([]byte("ACCT:PXFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) + assert.Equal(t, false, acctIsOldPendingTransfer([]byte("ACCT:PXFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) } func TestAcctStoreAndDeletePendingTransfers(t *testing.T) { @@ -121,9 +128,9 @@ func TestAcctGetEmptyData(t *testing.T) { logger, _ := zap.NewDevelopment() - pendingTransfers, err := db.AcctGetData(logger) + pendings, err := db.AcctGetData(logger) require.NoError(t, err) - assert.Equal(t, 0, len(pendingTransfers)) + assert.Equal(t, 0, len(pendings)) } func TestAcctGetData(t *testing.T) { @@ -186,10 +193,98 @@ func TestAcctGetData(t *testing.T) { err = db.AcctStorePendingTransfer(&msg1a) require.NoError(t, err) - pendingTransfers, err := db.AcctGetData(logger) + pendings, err := db.AcctGetData(logger) require.NoError(t, err) - require.Equal(t, 2, len(pendingTransfers)) + require.Equal(t, 2, len(pendings)) - assert.Equal(t, msg1a, *pendingTransfers[0]) - assert.Equal(t, *msg2, *pendingTransfers[1]) + assert.Equal(t, msg1a, *pendings[0]) + assert.Equal(t, *msg2, *pendings[1]) +} + +func TestAcctLoadingOldPendings(t *testing.T) { + dbPath := t.TempDir() + db, err := Open(dbPath) + if err != nil { + t.Error("failed to open database") + } + defer db.Close() + defer os.Remove(dbPath) + + tokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") + require.NoError(t, err) + + now := time.Unix(time.Now().Unix(), 0) + + // Write the first pending event in the old format. + pending1 := &common.MessagePublication{ + TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + Timestamp: now, + Nonce: 123456, + Sequence: 789101112131417, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddr, + Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + ConsistencyLevel: 16, + // IsReobservation will not be serialized. It should be set to false on reload. + } + + db.acctStoreOldPendingTransfer(t, pending1) + require.Nil(t, err) + + now2 := now.Add(time.Second * 5) + + // Write the second one in the new format. + pending2 := &common.MessagePublication{ + TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + Timestamp: now2, + Nonce: 123456, + Sequence: 789101112131418, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddr, + Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + ConsistencyLevel: 16, + IsReobservation: true, + } + + err = db.AcctStorePendingTransfer(pending2) + require.Nil(t, err) + + logger := zap.NewNop() + pendings, err := db.AcctGetData(logger) + require.NoError(t, err) + require.Equal(t, 2, len(pendings)) + + // Updated old pending events get placed at the end, so we need to sort into timestamp order. + sort.SliceStable(pendings, func(i, j int) bool { + return pendings[i].Timestamp.Before(pendings[j].Timestamp) + }) + + assert.Equal(t, *pending1, *pendings[0]) + assert.Equal(t, *pending2, *pendings[1]) + + // Make sure we can reload the updated pendings. + pendings2, err := db.AcctGetData(logger) + + require.Nil(t, err) + require.Equal(t, 2, len(pendings2)) + + assert.Equal(t, pending1, pendings2[0]) + assert.Equal(t, pending2, pendings2[1]) +} + +func (d *Database) acctStoreOldPendingTransfer(t *testing.T, msg *common.MessagePublication) { + buf := new(bytes.Buffer) + + b := marshalOldMessagePublication(msg) + + vaa.MustWrite(buf, binary.BigEndian, b) + + err := d.db.Update(func(txn *badger.Txn) error { + if err := txn.Set(acctOldPendingTransferMsgID(msg.MessageIDString()), buf.Bytes()); err != nil { + return err + } + return nil + }) + + require.NoError(t, err) } diff --git a/node/pkg/db/governor.go b/node/pkg/db/governor.go index 0dc906767..ac8c38b59 100644 --- a/node/pkg/db/governor.go +++ b/node/pkg/db/governor.go @@ -13,9 +13,6 @@ import ( "go.uber.org/zap" ) -// WARNING: Change me in ./node/governor as well -const maxEnqueuedTime = time.Hour * 24 - type GovernorDB interface { StoreTransfer(t *Transfer) error StorePendingMsg(k *PendingTransfer) error @@ -209,7 +206,7 @@ func (p *PendingTransfer) Marshal() ([]byte, error) { return buf.Bytes(), nil } -func UnmarshalPendingTransfer(data []byte) (*PendingTransfer, error) { +func UnmarshalPendingTransfer(data []byte, isOld bool) (*PendingTransfer, error) { p := &PendingTransfer{} reader := bytes.NewReader(data[:]) @@ -227,9 +224,14 @@ func UnmarshalPendingTransfer(data []byte) (*PendingTransfer, error) { return nil, fmt.Errorf("failed to read pending transfer msg [%d]: %w", n, err) } - msg, err := common.UnmarshalMessagePublication(buf) + var msg *common.MessagePublication + if isOld { + msg, err = common.UnmarshalOldMessagePublicationBeforeIsReobservation(buf) + } else { + msg, err = common.UnmarshalMessagePublication(buf) + } if err != nil { - return nil, fmt.Errorf("failed to unmarshal pending transfer msg: %w", err) + return nil, fmt.Errorf("failed to unmarshal pending transfer msg, isOld: %t: %w", isOld, err) } p.Msg = *msg @@ -243,13 +245,13 @@ const transfer = "GOV:XFER2:" const transferLen = len(transfer) // Since we are changing the DB format of pending entries, we will use a new tag in the pending key field. -// The first time we run this new release, any existing entries with the "GOV:PENDING" tag will get converted -// to the new format and given the "GOV:PENDING2" format. In a future release, the "GOV:PENDING" code can be deleted. +// The first time we run this new release, any existing entries with the "GOV:PENDING2" tag will get converted +// to the new format and given the "GOV:PENDING3" format. In a future release, the "GOV:PENDING2" code can be deleted. -const oldPending = "GOV:PENDING:" +const oldPending = "GOV:PENDING2:" const oldPendingLen = len(oldPending) -const pending = "GOV:PENDING2:" +const pending = "GOV:PENDING3:" const pendingLen = len(pending) const minMsgIdLen = len("1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0") @@ -308,20 +310,20 @@ func (d *Database) GetChainGovernorDataForTime(logger *zap.Logger, now time.Time } if IsPendingMsg(key) { - p, err := UnmarshalPendingTransfer(val) + p, err := UnmarshalPendingTransfer(val, false) if err != nil { return err } - if time.Until(p.ReleaseTime) > maxEnqueuedTime { - p.ReleaseTime = now.Add(maxEnqueuedTime) - err := d.StorePendingMsg(p) - if err != nil { - return fmt.Errorf("failed to write new pending msg for key [%v]: %w", p.Msg.MessageIDString(), err) - } + pending = append(pending, p) + } else if isOldPendingMsg(key) { + p, err := UnmarshalPendingTransfer(val, true) + if err != nil { + return err } pending = append(pending, p) + oldPendingToUpdate = append(oldPendingToUpdate, p) } else if IsTransfer(key) { v, err := UnmarshalTransfer(val) if err != nil { @@ -329,15 +331,7 @@ func (d *Database) GetChainGovernorDataForTime(logger *zap.Logger, now time.Time } transfers = append(transfers, v) - } else if isOldPendingMsg(key) { - msg, err := common.UnmarshalMessagePublication(val) - if err != nil { - return err - } - p := &PendingTransfer{ReleaseTime: now.Add(maxEnqueuedTime), Msg: *msg} - pending = append(pending, p) - oldPendingToUpdate = append(oldPendingToUpdate, p) } else if isOldTransfer(key) { v, err := unmarshalOldTransfer(val) if err != nil { diff --git a/node/pkg/db/governor_test.go b/node/pkg/db/governor_test.go index 2ea03e0fb..378374caa 100644 --- a/node/pkg/db/governor_test.go +++ b/node/pkg/db/governor_test.go @@ -70,7 +70,7 @@ func TestPendingMsgID(t *testing.T) { ConsistencyLevel: 16, } - assert.Equal(t, []byte("GOV:PENDING2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), PendingMsgID(msg1)) + assert.Equal(t, []byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), PendingMsgID(msg1)) } func TestTransferMsgID(t *testing.T) { @@ -110,18 +110,18 @@ func TestIsTransfer(t *testing.T) { } func TestIsPendingMsg(t *testing.T) { - assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) + assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) assert.Equal(t, false, IsPendingMsg([]byte("GOV:XFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) - assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING2:"))) - assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING2:"+"1"))) - assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING2:"+"1/1/1"))) - assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING2:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/"))) - assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING2:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0"))) - assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) + assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"))) + assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"1"))) + assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"1/1/1"))) + assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/"))) + assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0"))) + assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) assert.Equal(t, false, IsPendingMsg([]byte{0x01, 0x02, 0x03, 0x04})) assert.Equal(t, false, IsPendingMsg([]byte{})) - assert.Equal(t, true, isOldPendingMsg([]byte("GOV:PENDING:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) - assert.Equal(t, false, isOldPendingMsg([]byte("GOV:PENDING2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) + assert.Equal(t, true, isOldPendingMsg([]byte("GOV:PENDING2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) + assert.Equal(t, false, isOldPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) } func TestGetChainGovernorData(t *testing.T) { @@ -286,6 +286,7 @@ func TestSerializeAndDeserializeOfPendingTransfer(t *testing.T) { EmitterAddress: tokenBridgeAddr, Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0}, ConsistencyLevel: 16, + IsReobservation: true, } pending1 := &PendingTransfer{ @@ -296,12 +297,12 @@ func TestSerializeAndDeserializeOfPendingTransfer(t *testing.T) { bytes, err := pending1.Marshal() require.NoError(t, err) - pending2, err := UnmarshalPendingTransfer(bytes) + pending2, err := UnmarshalPendingTransfer(bytes, false) require.NoError(t, err) assert.Equal(t, pending1, pending2) - expectedPendingKey := "GOV:PENDING2:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415" + expectedPendingKey := "GOV:PENDING3:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415" assert.Equal(t, expectedPendingKey, string(PendingMsgID(&pending2.Msg))) } @@ -395,18 +396,40 @@ func TestStoreAndReloadTransfers(t *testing.T) { assert.Equal(t, pending2, pending[1]) } -func (d *Database) storeOldPendingMsg(t *testing.T, k *common.MessagePublication) { - b, _ := k.Marshal() +func (d *Database) storeOldPendingMsg(t *testing.T, p *PendingTransfer) { + buf := new(bytes.Buffer) + + vaa.MustWrite(buf, binary.BigEndian, uint32(p.ReleaseTime.Unix())) + + b := marshalOldMessagePublication(&p.Msg) + + vaa.MustWrite(buf, binary.BigEndian, b) err := d.db.Update(func(txn *badger.Txn) error { - if err := txn.Set(oldPendingMsgID(k), b); err != nil { + if err := txn.Set(oldPendingMsgID(&p.Msg), buf.Bytes()); err != nil { return err } return nil }) + require.NoError(t, err) } +func marshalOldMessagePublication(msg *common.MessagePublication) []byte { + buf := new(bytes.Buffer) + + buf.Write(msg.TxHash[:]) + vaa.MustWrite(buf, binary.BigEndian, uint32(msg.Timestamp.Unix())) + vaa.MustWrite(buf, binary.BigEndian, msg.Nonce) + vaa.MustWrite(buf, binary.BigEndian, msg.Sequence) + vaa.MustWrite(buf, binary.BigEndian, msg.ConsistencyLevel) + vaa.MustWrite(buf, binary.BigEndian, msg.EmitterChain) + buf.Write(msg.EmitterAddress[:]) + buf.Write(msg.Payload) + + return buf.Bytes() +} + func TestLoadingOldPendingTransfers(t *testing.T) { dbPath := t.TempDir() db, err := Open(dbPath) @@ -454,7 +477,7 @@ func TestLoadingOldPendingTransfers(t *testing.T) { // Write the first pending event in the old format. pending1 := &PendingTransfer{ - ReleaseTime: now.Add(time.Hour * 72), // Since we are writing this in the old format, this will not get stored, but computed on reload. + ReleaseTime: now.Add(time.Hour * 71), // Setting it to 71 hours so we can confirm it didn't get set to the default., Msg: common.MessagePublication{ TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: now, @@ -464,10 +487,11 @@ func TestLoadingOldPendingTransfers(t *testing.T) { EmitterAddress: tokenBridgeAddr, Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0}, ConsistencyLevel: 16, + // IsReobservation will not be serialized. It should be set to false on reload. }, } - db.storeOldPendingMsg(t, &pending1.Msg) + db.storeOldPendingMsg(t, pending1) require.Nil(t, err) now2 := now.Add(time.Second * 5) @@ -484,6 +508,7 @@ func TestLoadingOldPendingTransfers(t *testing.T) { EmitterAddress: tokenBridgeAddr, Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0}, ConsistencyLevel: 16, + IsReobservation: true, }, } diff --git a/node/pkg/governor/governor.go b/node/pkg/governor/governor.go index 358b1843a..21b8cdbe4 100644 --- a/node/pkg/governor/governor.go +++ b/node/pkg/governor/governor.go @@ -47,7 +47,6 @@ const ( transferEnqueued = false ) -// WARNING: Change me in ./node/db as well const maxEnqueuedTime = time.Hour * 24 type ( diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index 851c0681f..39541bd11 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -163,6 +163,12 @@ func (p *Processor) handleCleanup(ctx context.Context) { break } + // Reobservation requests should not be resubmitted but we will keep waiting for more observations. + if s.ourObservation.IsReobservation() { + p.logger.Debug("not submitting reobservation request for reobservation", zap.String("digest", hash), zap.Duration("delta", delta)) + break + } + // If we have already stored this VAA, there is no reason for us to request reobservation. alreadyInDB, err := p.signedVaaAlreadyInDB(hash, s) if err != nil { diff --git a/node/pkg/processor/message.go b/node/pkg/processor/message.go index af9572b49..9d9f44b2b 100644 --- a/node/pkg/processor/message.go +++ b/node/pkg/processor/message.go @@ -76,7 +76,8 @@ func (p *Processor) handleMessage(k *common.MessagePublication) { Sequence: k.Sequence, ConsistencyLevel: k.ConsistencyLevel, }, - Unreliable: k.Unreliable, + Unreliable: k.Unreliable, + Reobservation: k.IsReobservation, } // Generate digest of the unsigned VAA. @@ -100,7 +101,9 @@ func (p *Processor) handleMessage(k *common.MessagePublication) { zap.String("emitter_address_b58", base58.Encode(k.EmitterAddress.Bytes())), zap.Uint8("consistency_level", k.ConsistencyLevel), zap.String("message_id", v.MessageID()), - zap.String("signature", hex.EncodeToString(s))) + zap.String("signature", hex.EncodeToString(s)), + zap.Bool("isReobservation", k.IsReobservation), + ) messagesSignedTotal.With(prometheus.Labels{ "emitter_chain": k.EmitterChain.String()}).Add(1) diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index 120274dc6..b29c2623f 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -40,6 +40,8 @@ type ( SigningDigest() ethcommon.Hash // IsReliable returns whether this message is considered reliable meaning it can be reobserved. IsReliable() bool + // IsReobservation returns whether this message is the result of a reobservation request. + IsReobservation() bool // HandleQuorum finishes processing the observation once a quorum of signatures have // been received for it. HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) diff --git a/node/pkg/processor/vaa.go b/node/pkg/processor/vaa.go index 592ad2cd9..7216c02c9 100644 --- a/node/pkg/processor/vaa.go +++ b/node/pkg/processor/vaa.go @@ -7,7 +7,8 @@ import ( type VAA struct { vaa.VAA - Unreliable bool + Unreliable bool + Reobservation bool } func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { @@ -41,3 +42,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { func (v *VAA) IsReliable() bool { return !v.Unreliable } + +func (v *VAA) IsReobservation() bool { + return v.Reobservation +} diff --git a/node/pkg/watchers/algorand/watcher.go b/node/pkg/watchers/algorand/watcher.go index aac42b067..5928bed40 100644 --- a/node/pkg/watchers/algorand/watcher.go +++ b/node/pkg/watchers/algorand/watcher.go @@ -130,7 +130,7 @@ func gatherObservations(e *Watcher, t types.SignedTxnWithAD, depth int, logger * // lookAtTxn takes an outer transaction from the block.payset and gathers // observations from messages emitted in nested inner transactions // then passes them on the relevant channels -func lookAtTxn(e *Watcher, t types.SignedTxnInBlock, b types.Block, logger *zap.Logger) { +func lookAtTxn(e *Watcher, t types.SignedTxnInBlock, b types.Block, logger *zap.Logger, isReobservation bool) { observations := gatherObservations(e, t.SignedTxnWithAD, 0, logger) @@ -165,6 +165,7 @@ func lookAtTxn(e *Watcher, t types.SignedTxnInBlock, b types.Block, logger *zap. EmitterAddress: obs.emitterAddress, Payload: obs.payload, ConsistencyLevel: 0, + IsReobservation: isReobservation, } algorandMessagesConfirmed.Inc() @@ -261,7 +262,7 @@ func (e *Watcher) Run(ctx context.Context) error { } for _, element := range block.Payset { - lookAtTxn(e, element, block, logger) + lookAtTxn(e, element, block, logger, true) } } @@ -287,7 +288,7 @@ func (e *Watcher) Run(ctx context.Context) error { } for _, element := range block.Payset { - lookAtTxn(e, element, block, logger) + lookAtTxn(e, element, block, logger, false) } e.next_round = e.next_round + 1 diff --git a/node/pkg/watchers/aptos/watcher.go b/node/pkg/watchers/aptos/watcher.go index 467191d98..9ea36ac4b 100644 --- a/node/pkg/watchers/aptos/watcher.go +++ b/node/pkg/watchers/aptos/watcher.go @@ -146,7 +146,7 @@ func (e *Watcher) Run(ctx context.Context) error { if !data.Exists() { break } - e.observeData(logger, data, nativeSeq) + e.observeData(logger, data, nativeSeq, true) } case <-timer.C: @@ -201,7 +201,7 @@ func (e *Watcher) Run(ctx context.Context) error { if !data.Exists() { continue } - e.observeData(logger, data, eventSequence.Uint()) + e.observeData(logger, data, eventSequence.Uint(), false) } health, err := e.retrievePayload(aptosHealth) @@ -250,7 +250,7 @@ func (e *Watcher) retrievePayload(s string) ([]byte, error) { return body, err } -func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, nativeSeq uint64) { +func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, nativeSeq uint64, isReobservation bool) { em := data.Get("sender") if !em.Exists() { logger.Error("sender field missing") @@ -313,6 +313,7 @@ func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, nativeSeq u EmitterAddress: a, Payload: pl, ConsistencyLevel: uint8(consistencyLevel.Uint()), + IsReobservation: isReobservation, } aptosMessagesConfirmed.Inc() diff --git a/node/pkg/watchers/cosmwasm/watcher.go b/node/pkg/watchers/cosmwasm/watcher.go index 02b3babe2..2ba503bab 100644 --- a/node/pkg/watchers/cosmwasm/watcher.go +++ b/node/pkg/watchers/cosmwasm/watcher.go @@ -304,6 +304,7 @@ func (e *Watcher) Run(ctx context.Context) error { msgs := EventsToMessagePublications(e.contract, txHash, events.Array(), logger, e.chainID, contractAddressLogKey) for _, msg := range msgs { + msg.IsReobservation = true e.msgC <- msg messagesConfirmed.WithLabelValues(networkName).Inc() } diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index 00d051ec4..0cbc15996 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -423,6 +423,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { } for _, msg := range msgs { + msg.IsReobservation = true if msg.ConsistencyLevel == vaa.ConsistencyLevelPublishImmediately { logger.Info("re-observed message publication transaction, publishing it immediately", zap.Stringer("tx", msg.TxHash), diff --git a/node/pkg/watchers/ibc/watcher.go b/node/pkg/watchers/ibc/watcher.go index aa3890635..880716810 100644 --- a/node/pkg/watchers/ibc/watcher.go +++ b/node/pkg/watchers/ibc/watcher.go @@ -476,6 +476,7 @@ func (w *Watcher) handleObservationRequests(ctx context.Context, ce *chainEntry) } if evt != nil { + evt.Msg.IsReobservation = true if err := w.processIbcReceivePublishEvent(evt, "reobservation"); err != nil { return fmt.Errorf("failed to process reobserved IBC event: %w", err) } diff --git a/node/pkg/watchers/mock/watcher.go b/node/pkg/watchers/mock/watcher.go index 15543558b..86b5b2fff 100644 --- a/node/pkg/watchers/mock/watcher.go +++ b/node/pkg/watchers/mock/watcher.go @@ -41,7 +41,9 @@ func NewWatcherRunnable( logger.Info("Received obsv request", zap.String("log_msg_type", "obsv_req_received"), zap.String("tx_hash", hash.Hex())) msg, ok := c.ObservationDb[hash] if ok { - msgC <- msg + msg2 := *msg + msg2.IsReobservation = true + msgC <- &msg2 } } } diff --git a/node/pkg/watchers/near/poll.go b/node/pkg/watchers/near/poll.go index 5343764c4..9839e83fd 100644 --- a/node/pkg/watchers/near/poll.go +++ b/node/pkg/watchers/near/poll.go @@ -21,7 +21,7 @@ func (e *Watcher) fetchAndParseChunk(logger *zap.Logger, ctx context.Context, ch result := make([]*transactionProcessingJob, len(txns)) for i, tx := range txns { - result[i] = newTransactionProcessingJob(tx.Hash, tx.SignerId) + result[i] = newTransactionProcessingJob(tx.Hash, tx.SignerId, false) } return result, nil } diff --git a/node/pkg/watchers/near/tx_processing.go b/node/pkg/watchers/near/tx_processing.go index 4645408b9..afcf17c79 100644 --- a/node/pkg/watchers/near/tx_processing.go +++ b/node/pkg/watchers/near/tx_processing.go @@ -243,6 +243,7 @@ func (e *Watcher) processWormholeLog(logger *zap.Logger, _ context.Context, job EmitterAddress: a, Payload: pl, ConsistencyLevel: 0, + IsReobservation: job.isReobservation, } // tell everyone about it diff --git a/node/pkg/watchers/near/watcher.go b/node/pkg/watchers/near/watcher.go index ed1fbf769..3da2f5266 100644 --- a/node/pkg/watchers/near/watcher.go +++ b/node/pkg/watchers/near/watcher.go @@ -56,6 +56,7 @@ type ( creationTime time.Time retryCounter uint delay time.Duration + isReobservation bool // set during processing hasWormholeMsg bool // set during processing; whether this transaction emitted a Wormhole message @@ -111,13 +112,14 @@ func NewWatcher( } } -func newTransactionProcessingJob(txHash string, senderAccountId string) *transactionProcessingJob { +func newTransactionProcessingJob(txHash string, senderAccountId string, isReobservation bool) *transactionProcessingJob { return &transactionProcessingJob{ txHash, senderAccountId, time.Now(), 0, initialTxProcDelay, + isReobservation, false, } } @@ -204,7 +206,7 @@ func (e *Watcher) runObsvReqProcessor(ctx context.Context) error { // This value is used by NEAR to determine which shard to query. An incorrect value here is not a security risk but could lead to reobservation requests failing. // Guardians currently run nodes for all shards and the API seems to be returning the correct results independent of the set senderAccountId but this could change in the future. // Fixing this would require adding the transaction sender account ID to the observation request. - job := newTransactionProcessingJob(txHash, e.wormholeAccount) + job := newTransactionProcessingJob(txHash, e.wormholeAccount, true) err := e.schedule(ctx, job, time.Nanosecond) if err != nil { // Error-level logging here because this is after an re-observation request already, which should be infrequent diff --git a/node/pkg/watchers/near/watcher_test.go b/node/pkg/watchers/near/watcher_test.go index 3865a35ed..0389c9625 100644 --- a/node/pkg/watchers/near/watcher_test.go +++ b/node/pkg/watchers/near/watcher_test.go @@ -150,6 +150,7 @@ func (testCase *testCase) run(ctx context.Context) error { // assert that messages were re-observed correctly... expectedMsgReObserved := map[string]*testMessageTracker{} for _, em := range testCase.expectedMsgReObserved { + em.IsReobservation = true expectedMsgReObserved[em.MessageIDString()] = &testMessageTracker{MessagePublication: em, seen: false} } diff --git a/node/pkg/watchers/solana/client.go b/node/pkg/watchers/solana/client.go index 4c54f2e7f..f582dbfc2 100644 --- a/node/pkg/watchers/solana/client.go +++ b/node/pkg/watchers/solana/client.go @@ -311,7 +311,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { case <-ctx.Done(): return nil case msg := <-s.pumpData: - err := s.processAccountSubscriptionData(ctx, logger, msg) + err := s.processAccountSubscriptionData(ctx, logger, msg, false) if err != nil { p2p.DefaultRegistry.AddErrorCount(s.chainID, 1) solanaConnectionErrors.WithLabelValues(s.networkName, string(s.commitment), "account_subscription_data").Inc() @@ -327,7 +327,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { logger.Info("received observation request", zap.String("account", acc.String())) rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) - s.fetchMessageAccount(rCtx, logger, acc, 0) + s.fetchMessageAccount(rCtx, logger, acc, 0, true) cancel() case <-timer.C: // Get current slot height @@ -370,7 +370,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { for slot := rangeStart; slot <= rangeEnd; slot++ { _slot := slot common.RunWithScissors(ctx, s.errC, "SolanaWatcherSlotFetcher", func(ctx context.Context) error { - s.retryFetchBlock(ctx, logger, _slot, 0) + s.retryFetchBlock(ctx, logger, _slot, 0, false) return nil }) } @@ -389,8 +389,8 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { } } -func (s *SolanaWatcher) retryFetchBlock(ctx context.Context, logger *zap.Logger, slot uint64, retry uint) { - ok := s.fetchBlock(ctx, logger, slot, 0) +func (s *SolanaWatcher) retryFetchBlock(ctx context.Context, logger *zap.Logger, slot uint64, retry uint, isReobservation bool) { + ok := s.fetchBlock(ctx, logger, slot, 0, isReobservation) if !ok { if retry >= maxRetries { @@ -409,13 +409,13 @@ func (s *SolanaWatcher) retryFetchBlock(ctx context.Context, logger *zap.Logger, zap.Uint("retry", retry)) common.RunWithScissors(ctx, s.errC, "retryFetchBlock", func(ctx context.Context) error { - s.retryFetchBlock(ctx, logger, slot, retry+1) + s.retryFetchBlock(ctx, logger, slot, retry+1, isReobservation) return nil }) } } -func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot uint64, emptyRetry uint) (ok bool) { +func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot uint64, emptyRetry uint, isReobservation bool) (ok bool) { logger.Debug("requesting block", zap.Uint64("slot", slot), zap.String("commitment", string(s.commitment)), @@ -455,7 +455,7 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot if emptyRetry < maxEmptyRetry { common.RunWithScissors(ctx, s.errC, "delayedFetchBlock", func(ctx context.Context) error { time.Sleep(retryDelay) - s.fetchBlock(ctx, logger, slot, emptyRetry+1) + s.fetchBlock(ctx, logger, slot, emptyRetry+1, isReobservation) return nil }) } @@ -545,7 +545,7 @@ OUTER: // Find top-level instructions for i, inst := range tx.Message.Instructions { - found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i) + found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, isReobservation) if err != nil { logger.Error("malformed Wormhole instruction", zap.Error(err), @@ -591,7 +591,7 @@ OUTER: for _, inner := range tr.Meta.InnerInstructions { for i, inst := range inner.Instructions { - _, err = s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i) + _, err = s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, isReobservation) if err != nil { logger.Error("malformed Wormhole instruction", zap.Error(err), @@ -614,7 +614,7 @@ OUTER: return true } -func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logger, slot uint64, inst solana.CompiledInstruction, programIndex uint16, tx *solana.Transaction, signature solana.Signature, idx int) (bool, error) { +func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logger, slot uint64, inst solana.CompiledInstruction, programIndex uint16, tx *solana.Transaction, signature solana.Signature, idx int, isReobservation bool) (bool, error) { if inst.ProgramIDIndex != programIndex { return false, nil } @@ -657,15 +657,15 @@ func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logg zap.Stringer("signature", signature), zap.Uint64("slot", slot), zap.Int("idx", idx)) common.RunWithScissors(ctx, s.errC, "retryFetchMessageAccount", func(ctx context.Context) error { - s.retryFetchMessageAccount(ctx, logger, acc, slot, 0) + s.retryFetchMessageAccount(ctx, logger, acc, slot, 0, isReobservation) return nil }) return true, nil } -func (s *SolanaWatcher) retryFetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, slot uint64, retry uint) { - retryable := s.fetchMessageAccount(ctx, logger, acc, slot) +func (s *SolanaWatcher) retryFetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, slot uint64, retry uint, isReobservation bool) { + retryable := s.fetchMessageAccount(ctx, logger, acc, slot, isReobservation) if retryable { if retry >= maxRetries { @@ -686,13 +686,13 @@ func (s *SolanaWatcher) retryFetchMessageAccount(ctx context.Context, logger *za zap.Uint("retry", retry)) common.RunWithScissors(ctx, s.errC, "retryFetchMessageAccount", func(ctx context.Context) error { - s.retryFetchMessageAccount(ctx, logger, acc, slot, retry+1) + s.retryFetchMessageAccount(ctx, logger, acc, slot, retry+1, isReobservation) return nil }) } } -func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, slot uint64) (retryable bool) { +func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, slot uint64, isReobservation bool) (retryable bool) { // Fetching account rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) defer cancel() @@ -741,11 +741,11 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Log zap.Stringer("account", acc), zap.Binary("data", data)) - s.processMessageAccount(logger, data, acc) + s.processMessageAccount(logger, data, acc, isReobservation) return false } -func (s *SolanaWatcher) processAccountSubscriptionData(_ context.Context, logger *zap.Logger, data []byte) error { +func (s *SolanaWatcher) processAccountSubscriptionData(_ context.Context, logger *zap.Logger, data []byte, isReobservation bool) error { // Do we have an error on the subscription? var e EventSubscriptionError err := json.Unmarshal(data, &e) @@ -795,7 +795,7 @@ func (s *SolanaWatcher) processAccountSubscriptionData(_ context.Context, logger switch string(data[:3]) { case accountPrefixReliable, accountPrefixUnreliable: acc := solana.PublicKeyFromBytes([]byte(value.Pubkey)) - s.processMessageAccount(logger, data, acc) + s.processMessageAccount(logger, data, acc, isReobservation) default: break } @@ -803,7 +803,7 @@ func (s *SolanaWatcher) processAccountSubscriptionData(_ context.Context, logger return nil } -func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, acc solana.PublicKey) { +func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, acc solana.PublicKey, isReobservation bool) { proposal, err := ParseMessagePublicationAccount(data) if err != nil { solanaAccountSkips.WithLabelValues(s.networkName, "parse_transfer_out").Inc() @@ -837,6 +837,7 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a EmitterAddress: proposal.EmitterAddress, Payload: proposal.Payload, ConsistencyLevel: proposal.ConsistencyLevel, + IsReobservation: isReobservation, Unreliable: !reliable, } diff --git a/node/pkg/watchers/sui/watcher.go b/node/pkg/watchers/sui/watcher.go index 1d238cbbb..bb3c7826b 100644 --- a/node/pkg/watchers/sui/watcher.go +++ b/node/pkg/watchers/sui/watcher.go @@ -173,7 +173,7 @@ func NewWatcher( } } -func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult) error { +func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult, isReobservation bool) error { if body.ID.TxDigest == nil { return errors.New("Missing TxDigest field") } @@ -249,6 +249,7 @@ func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult) error { EmitterAddress: emitter, Payload: fields.Payload, ConsistencyLevel: *fields.ConsistencyLevel, + IsReobservation: isReobservation, } suiMessagesConfirmed.Inc() @@ -374,7 +375,7 @@ func (e *Watcher) Run(ctx context.Context) error { } if res.Params != nil && (*res.Params).Result != nil { - err := e.inspectBody(logger, *(*res.Params).Result) + err := e.inspectBody(logger, *(*res.Params).Result, false) if err != nil { logger.Error(fmt.Sprintf("inspectBody: %s", err.Error())) } @@ -491,7 +492,7 @@ func (e *Watcher) Run(ctx context.Context) error { } for i, chunk := range res.Result { - err := e.inspectBody(logger, chunk) + err := e.inspectBody(logger, chunk, true) if err != nil { logger.Info("skipping event data in result", zap.String("txhash", tx58), zap.Int("index", i), zap.Error(err)) }