Node/Gov: Add target to db (#3791)

* Node/Gov: Add target to db

* Code review rework

* Switch tests to use ErrorContains
This commit is contained in:
bruce-riley 2024-03-05 09:46:50 -06:00 committed by GitHub
parent 0a49c6ee28
commit 67466838bf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 579 additions and 133 deletions

View File

@ -53,6 +53,8 @@ type Transfer struct {
EmitterAddress vaa.Address EmitterAddress vaa.Address
MsgID string MsgID string
Hash string Hash string
TargetAddress vaa.Address
TargetChain vaa.ChainID
} }
func (t *Transfer) Marshal() ([]byte, error) { func (t *Transfer) Marshal() ([]byte, error) {
@ -72,6 +74,8 @@ func (t *Transfer) Marshal() ([]byte, error) {
if len(t.Hash) > 0 { if len(t.Hash) > 0 {
buf.Write([]byte(t.Hash)) buf.Write([]byte(t.Hash))
} }
vaa.MustWrite(buf, binary.BigEndian, t.TargetChain)
buf.Write(t.TargetAddress[:])
return buf.Bytes(), nil return buf.Bytes(), nil
} }
@ -91,17 +95,93 @@ func UnmarshalTransfer(data []byte) (*Transfer, error) {
} }
if err := binary.Read(reader, binary.BigEndian, &t.OriginChain); err != nil { if err := binary.Read(reader, binary.BigEndian, &t.OriginChain); err != nil {
return nil, fmt.Errorf("failed to read token chain id: %w", err) return nil, fmt.Errorf("failed to read origin chain id: %w", err)
} }
originAddress := vaa.Address{} originAddress := vaa.Address{}
if n, err := reader.Read(originAddress[:]); err != nil || n != 32 { if n, err := reader.Read(originAddress[:]); err != nil || n != 32 {
return nil, fmt.Errorf("failed to read emitter address [%d]: %w", n, err) return nil, fmt.Errorf("failed to read origin address [%d]: %w", n, err)
} }
t.OriginAddress = originAddress t.OriginAddress = originAddress
if err := binary.Read(reader, binary.BigEndian, &t.EmitterChain); err != nil { if err := binary.Read(reader, binary.BigEndian, &t.EmitterChain); err != nil {
return nil, fmt.Errorf("failed to read token chain id: %w", err) return nil, fmt.Errorf("failed to read emitter chain id: %w", err)
}
emitterAddress := vaa.Address{}
if n, err := reader.Read(emitterAddress[:]); err != nil || n != 32 {
return nil, fmt.Errorf("failed to read emitter address [%d]: %w", n, err)
}
t.EmitterAddress = emitterAddress
msgIdLen := uint16(0)
if err := binary.Read(reader, binary.BigEndian, &msgIdLen); err != nil {
return nil, fmt.Errorf("failed to read msgID length: %w", err)
}
if msgIdLen > 0 {
msgID := make([]byte, msgIdLen)
n, err := reader.Read(msgID)
if err != nil || n != int(msgIdLen) {
return nil, fmt.Errorf("failed to read msg id [%d]: %w", n, err)
}
t.MsgID = string(msgID[:n])
}
hashLen := uint16(0)
if err := binary.Read(reader, binary.BigEndian, &hashLen); err != nil {
return nil, fmt.Errorf("failed to read hash length: %w", err)
}
if hashLen > 0 {
hash := make([]byte, hashLen)
n, err := reader.Read(hash)
if err != nil || n != int(hashLen) {
return nil, fmt.Errorf("failed to read hash [%d]: %w", n, err)
}
t.Hash = string(hash[:n])
}
if err := binary.Read(reader, binary.BigEndian, &t.TargetChain); err != nil {
return nil, fmt.Errorf("failed to read target chain id: %w", err)
}
targetAddress := vaa.Address{}
if n, err := reader.Read(targetAddress[:]); err != nil || n != 32 {
return nil, fmt.Errorf("failed to read target address [%d]: %w", n, err)
}
t.TargetAddress = targetAddress
return t, nil
}
func unmarshalOldTransfer(data []byte) (*Transfer, error) {
t := &Transfer{}
reader := bytes.NewReader(data[:])
unixSeconds := uint32(0)
if err := binary.Read(reader, binary.BigEndian, &unixSeconds); err != nil {
return nil, fmt.Errorf("failed to read timestamp: %w", err)
}
t.Timestamp = time.Unix(int64(unixSeconds), 0)
if err := binary.Read(reader, binary.BigEndian, &t.Value); err != nil {
return nil, fmt.Errorf("failed to read value: %w", err)
}
if err := binary.Read(reader, binary.BigEndian, &t.OriginChain); err != nil {
return nil, fmt.Errorf("failed to read origin chain id: %w", err)
}
originAddress := vaa.Address{}
if n, err := reader.Read(originAddress[:]); err != nil || n != 32 {
return nil, fmt.Errorf("failed to read origin address [%d]: %w", n, err)
}
t.OriginAddress = originAddress
if err := binary.Read(reader, binary.BigEndian, &t.EmitterChain); err != nil {
return nil, fmt.Errorf("failed to read emitter chain id: %w", err)
} }
emitterAddress := vaa.Address{} emitterAddress := vaa.Address{}
@ -138,50 +218,7 @@ func UnmarshalTransfer(data []byte) (*Transfer, error) {
t.Hash = string(hash[:n]) t.Hash = string(hash[:n])
} }
return t, nil // Do not include the target chain or address.
}
func unmarshalOldTransfer(data []byte) (*Transfer, error) {
t := &Transfer{}
reader := bytes.NewReader(data[:])
unixSeconds := uint32(0)
if err := binary.Read(reader, binary.BigEndian, &unixSeconds); err != nil {
return nil, fmt.Errorf("failed to read timestamp: %w", err)
}
t.Timestamp = time.Unix(int64(unixSeconds), 0)
if err := binary.Read(reader, binary.BigEndian, &t.Value); err != nil {
return nil, fmt.Errorf("failed to read value: %w", err)
}
if err := binary.Read(reader, binary.BigEndian, &t.OriginChain); err != nil {
return nil, fmt.Errorf("failed to read token chain id: %w", err)
}
originAddress := vaa.Address{}
if n, err := reader.Read(originAddress[:]); err != nil || n != 32 {
return nil, fmt.Errorf("failed to read emitter address [%d]: %w", n, err)
}
t.OriginAddress = originAddress
if err := binary.Read(reader, binary.BigEndian, &t.EmitterChain); err != nil {
return nil, fmt.Errorf("failed to read token chain id: %w", err)
}
emitterAddress := vaa.Address{}
if n, err := reader.Read(emitterAddress[:]); err != nil || n != 32 {
return nil, fmt.Errorf("failed to read emitter address [%d]: %w", n, err)
}
t.EmitterAddress = emitterAddress
msgID := make([]byte, 256)
n, err := reader.Read(msgID)
if err != nil || n == 0 {
return nil, fmt.Errorf("failed to read vaa id [%d]: %w", n, err)
}
t.MsgID = string(msgID[:n])
return t, nil return t, nil
} }
@ -238,10 +275,10 @@ func UnmarshalPendingTransfer(data []byte, isOld bool) (*PendingTransfer, error)
return p, nil return p, nil
} }
const oldTransfer = "GOV:XFER:" const oldTransfer = "GOV:XFER2:"
const oldTransferLen = len(oldTransfer) const oldTransferLen = len(oldTransfer)
const transfer = "GOV:XFER2:" const transfer = "GOV:XFER3:"
const transferLen = len(transfer) const transferLen = len(transfer)
// Since we are changing the DB format of pending entries, we will use a new tag in the pending key field. // Since we are changing the DB format of pending entries, we will use a new tag in the pending key field.

View File

@ -29,7 +29,10 @@ func TestSerializeAndDeserializeOfTransfer(t *testing.T) {
tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
require.NoError(t, err) require.NoError(t, err)
tokenBridgeAddr, _ := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
require.NoError(t, err)
bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
require.NoError(t, err) require.NoError(t, err)
xfer1 := &Transfer{ xfer1 := &Transfer{
@ -38,7 +41,9 @@ func TestSerializeAndDeserializeOfTransfer(t *testing.T) {
OriginChain: vaa.ChainIDEthereum, OriginChain: vaa.ChainIDEthereum,
OriginAddress: tokenAddr, OriginAddress: tokenAddr,
EmitterChain: vaa.ChainIDEthereum, EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr, EmitterAddress: ethereumTokenBridgeAddr,
TargetChain: vaa.ChainIDBSC,
TargetAddress: bscTokenBridgeAddr,
MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415", MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
Hash: "Hash1", Hash: "Hash1",
} }
@ -51,12 +56,12 @@ func TestSerializeAndDeserializeOfTransfer(t *testing.T) {
assert.Equal(t, xfer1, xfer2) assert.Equal(t, xfer1, xfer2)
expectedTransferKey := "GOV:XFER2:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415" expectedTransferKey := "GOV:XFER3:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
assert.Equal(t, expectedTransferKey, string(TransferMsgID(xfer2))) assert.Equal(t, expectedTransferKey, string(TransferMsgID(xfer2)))
} }
func TestPendingMsgID(t *testing.T) { func TestPendingMsgID(t *testing.T) {
tokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
require.NoError(t, err) require.NoError(t, err)
msg1 := &common.MessagePublication{ msg1 := &common.MessagePublication{
@ -65,7 +70,7 @@ func TestPendingMsgID(t *testing.T) {
Nonce: 123456, Nonce: 123456,
Sequence: 789101112131415, Sequence: 789101112131415,
EmitterChain: vaa.ChainIDEthereum, EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr, EmitterAddress: ethereumTokenBridgeAddr,
Payload: []byte{}, Payload: []byte{},
ConsistencyLevel: 16, ConsistencyLevel: 16,
} }
@ -77,7 +82,10 @@ func TestTransferMsgID(t *testing.T) {
tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
require.NoError(t, err) require.NoError(t, err)
tokenBridgeAddr, _ := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
require.NoError(t, err)
bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
require.NoError(t, err) require.NoError(t, err)
xfer := &Transfer{ xfer := &Transfer{
@ -86,32 +94,34 @@ func TestTransferMsgID(t *testing.T) {
OriginChain: vaa.ChainIDEthereum, OriginChain: vaa.ChainIDEthereum,
OriginAddress: tokenAddr, OriginAddress: tokenAddr,
EmitterChain: vaa.ChainIDEthereum, EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr, EmitterAddress: ethereumTokenBridgeAddr,
TargetChain: vaa.ChainIDBSC,
TargetAddress: bscTokenBridgeAddr,
MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415", MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
Hash: "Hash1", Hash: "Hash1",
} }
assert.Equal(t, []byte("GOV:XFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), TransferMsgID(xfer)) assert.Equal(t, []byte("GOV:XFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), TransferMsgID(xfer))
} }
func TestIsTransfer(t *testing.T) { func TestIsTransfer(t *testing.T) {
assert.Equal(t, true, IsTransfer([]byte("GOV:XFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) assert.Equal(t, true, IsTransfer([]byte("GOV:XFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
assert.Equal(t, false, IsTransfer([]byte("GOV:XFER2:"))) assert.Equal(t, false, IsTransfer([]byte("GOV:XFER3:")))
assert.Equal(t, false, IsTransfer([]byte("GOV:XFER2:1"))) assert.Equal(t, false, IsTransfer([]byte("GOV:XFER3:1")))
assert.Equal(t, false, IsTransfer([]byte("GOV:XFER2:1/1/1"))) assert.Equal(t, false, IsTransfer([]byte("GOV:XFER3:1/1/1")))
assert.Equal(t, false, IsTransfer([]byte("GOV:XFER2:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/"))) assert.Equal(t, false, IsTransfer([]byte("GOV:XFER3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/")))
assert.Equal(t, true, IsTransfer([]byte("GOV:XFER2:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0"))) assert.Equal(t, true, IsTransfer([]byte("GOV:XFER3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")))
assert.Equal(t, false, IsTransfer([]byte("GOV:PENDING:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) assert.Equal(t, false, IsTransfer([]byte("GOV:PENDING:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
assert.Equal(t, false, IsTransfer([]byte{0x01, 0x02, 0x03, 0x04})) assert.Equal(t, false, IsTransfer([]byte{0x01, 0x02, 0x03, 0x04}))
assert.Equal(t, false, IsTransfer([]byte{})) assert.Equal(t, false, IsTransfer([]byte{}))
assert.Equal(t, true, isOldTransfer([]byte("GOV:XFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) assert.Equal(t, true, isOldTransfer([]byte("GOV:XFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
assert.Equal(t, false, isOldTransfer([]byte("GOV:XFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) assert.Equal(t, false, isOldTransfer([]byte("GOV:XFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
} }
func TestIsPendingMsg(t *testing.T) { func TestIsPendingMsg(t *testing.T) {
assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
assert.Equal(t, false, IsPendingMsg([]byte("GOV:XFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) assert.Equal(t, false, IsPendingMsg([]byte("GOV:XFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"))) assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:")))
assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"1"))) assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"1")))
assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"1/1/1"))) assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"1/1/1")))
@ -132,7 +142,7 @@ func TestGetChainGovernorData(t *testing.T) {
} }
defer db.Close() defer db.Close()
logger, _ := zap.NewDevelopment() logger := zap.NewNop()
transfers, pending, err2 := db.GetChainGovernorData(logger) transfers, pending, err2 := db.GetChainGovernorData(logger)
@ -152,7 +162,10 @@ func TestStoreTransfer(t *testing.T) {
tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
require.NoError(t, err) require.NoError(t, err)
tokenBridgeAddr, _ := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
require.NoError(t, err)
bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
require.NoError(t, err) require.NoError(t, err)
xfer1 := &Transfer{ xfer1 := &Transfer{
@ -161,7 +174,9 @@ func TestStoreTransfer(t *testing.T) {
OriginChain: vaa.ChainIDEthereum, OriginChain: vaa.ChainIDEthereum,
OriginAddress: tokenAddr, OriginAddress: tokenAddr,
EmitterChain: vaa.ChainIDEthereum, EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr, EmitterAddress: ethereumTokenBridgeAddr,
TargetChain: vaa.ChainIDBSC,
TargetAddress: bscTokenBridgeAddr,
MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415", MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
Hash: "Hash1", Hash: "Hash1",
} }
@ -181,7 +196,10 @@ func TestDeleteTransfer(t *testing.T) {
tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
require.NoError(t, err) require.NoError(t, err)
tokenBridgeAddr, _ := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
require.NoError(t, err)
bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
require.NoError(t, err) require.NoError(t, err)
xfer1 := &Transfer{ xfer1 := &Transfer{
@ -190,7 +208,9 @@ func TestDeleteTransfer(t *testing.T) {
OriginChain: vaa.ChainIDEthereum, OriginChain: vaa.ChainIDEthereum,
OriginAddress: tokenAddr, OriginAddress: tokenAddr,
EmitterChain: vaa.ChainIDEthereum, EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr, EmitterAddress: ethereumTokenBridgeAddr,
TargetChain: vaa.ChainIDBSC,
TargetAddress: bscTokenBridgeAddr,
MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415", MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
Hash: "Hash1", Hash: "Hash1",
} }
@ -315,7 +335,10 @@ func TestStoreAndReloadTransfers(t *testing.T) {
defer db.Close() defer db.Close()
defer os.Remove(dbPath) defer os.Remove(dbPath)
tokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
require.NoError(t, err)
bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
require.NoError(t, err) require.NoError(t, err)
tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
@ -327,7 +350,9 @@ func TestStoreAndReloadTransfers(t *testing.T) {
OriginChain: vaa.ChainIDEthereum, OriginChain: vaa.ChainIDEthereum,
OriginAddress: tokenAddr, OriginAddress: tokenAddr,
EmitterChain: vaa.ChainIDEthereum, EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr, EmitterAddress: ethereumTokenBridgeAddr,
TargetChain: vaa.ChainIDBSC,
TargetAddress: bscTokenBridgeAddr,
MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415", MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
Hash: "Hash1", Hash: "Hash1",
} }
@ -341,7 +366,9 @@ func TestStoreAndReloadTransfers(t *testing.T) {
OriginChain: vaa.ChainIDEthereum, OriginChain: vaa.ChainIDEthereum,
OriginAddress: tokenAddr, OriginAddress: tokenAddr,
EmitterChain: vaa.ChainIDEthereum, EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr, EmitterAddress: ethereumTokenBridgeAddr,
TargetChain: vaa.ChainIDBSC,
TargetAddress: bscTokenBridgeAddr,
MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131416", MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131416",
Hash: "Hash2", Hash: "Hash2",
} }
@ -357,7 +384,7 @@ func TestStoreAndReloadTransfers(t *testing.T) {
Nonce: 123456, Nonce: 123456,
Sequence: 789101112131417, Sequence: 789101112131417,
EmitterChain: vaa.ChainIDEthereum, EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr, EmitterAddress: ethereumTokenBridgeAddr,
Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
ConsistencyLevel: 16, ConsistencyLevel: 16,
}, },
@ -374,7 +401,7 @@ func TestStoreAndReloadTransfers(t *testing.T) {
Nonce: 123456, Nonce: 123456,
Sequence: 789101112131418, Sequence: 789101112131418,
EmitterChain: vaa.ChainIDEthereum, EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr, EmitterAddress: ethereumTokenBridgeAddr,
Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
ConsistencyLevel: 16, ConsistencyLevel: 16,
}, },
@ -396,6 +423,164 @@ func TestStoreAndReloadTransfers(t *testing.T) {
assert.Equal(t, pending2, pending[1]) assert.Equal(t, pending2, pending[1])
} }
func TestMarshalUnmarshalNoMsgIdOrHash(t *testing.T) {
tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
require.NoError(t, err)
ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
require.NoError(t, err)
bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
require.NoError(t, err)
xfer1 := &Transfer{
Timestamp: time.Unix(int64(1654516425), 0),
Value: 125000,
OriginChain: vaa.ChainIDEthereum,
OriginAddress: tokenAddr,
EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: ethereumTokenBridgeAddr,
TargetChain: vaa.ChainIDBSC,
TargetAddress: bscTokenBridgeAddr,
// Don't set MsgID or Hash, should handle empty slices.
}
bytes, err := xfer1.Marshal()
require.NoError(t, err)
xfer2, err := UnmarshalTransfer(bytes)
require.NoError(t, err)
require.Equal(t, xfer1, xfer2)
}
// Note that Transfer.Marshal can't fail, so there are no negative tests for that.
func TestUnmarshalTransferFailures(t *testing.T) {
tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
require.NoError(t, err)
ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
require.NoError(t, err)
bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
require.NoError(t, err)
xfer1 := &Transfer{
Timestamp: time.Unix(int64(1654516425), 0),
Value: 125000,
OriginChain: vaa.ChainIDEthereum,
OriginAddress: tokenAddr,
EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: ethereumTokenBridgeAddr,
TargetChain: vaa.ChainIDBSC,
TargetAddress: bscTokenBridgeAddr,
MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
Hash: "Hash1",
}
bytes, err := xfer1.Marshal()
require.NoError(t, err)
// First make sure regular unmarshal works.
xfer2, err := UnmarshalTransfer(bytes)
require.NoError(t, err)
require.Equal(t, xfer1, xfer2)
// Truncate the timestamp.
_, err = UnmarshalTransfer(bytes[0 : 4-1])
assert.ErrorContains(t, err, "failed to read timestamp: ")
// Truncate the value.
_, err = UnmarshalTransfer(bytes[0 : 4+8-1])
assert.ErrorContains(t, err, "failed to read value: ")
// Truncate the origin chain.
_, err = UnmarshalTransfer(bytes[0 : 4+8+2-1])
assert.ErrorContains(t, err, "failed to read origin chain id: ")
// Truncate the origin address.
_, err = UnmarshalTransfer(bytes[0 : 4+8+2+32-1])
assert.ErrorContains(t, err, "failed to read origin address")
// Truncate the emitter chain.
_, err = UnmarshalTransfer(bytes[0 : 4+8+2+32+2-1])
assert.ErrorContains(t, err, "failed to read emitter chain id: ")
// Truncate the emitter address.
_, err = UnmarshalTransfer(bytes[0 : 4+8+2+32+2+32-1])
assert.ErrorContains(t, err, "failed to read emitter address")
// Truncate the message ID length.
_, err = UnmarshalTransfer(bytes[0 : 4+8+2+32+2+32+2-1])
assert.ErrorContains(t, err, "failed to read msgID length: ")
// Truncate the message ID data.
_, err = UnmarshalTransfer(bytes[0 : 4+8+2+32+2+32+2+3])
assert.ErrorContains(t, err, "failed to read msg id")
// Truncate the hash length.
_, err = UnmarshalTransfer(bytes[0 : 4+8+2+32+2+32+2+82+2-1])
assert.ErrorContains(t, err, "failed to read hash length: ")
// Truncate the hash data.
_, err = UnmarshalTransfer(bytes[0 : 4+8+2+32+2+32+2+82+2+3])
assert.ErrorContains(t, err, "failed to read hash")
// Truncate the target chain.
_, err = UnmarshalTransfer(bytes[0 : 4+8+2+32+2+32+2+82+2+5+2-1])
assert.ErrorContains(t, err, "failed to read target chain id: ")
// Truncate the target address.
_, err = UnmarshalTransfer(bytes[0 : 4+8+2+32+2+32+2+82+2+5+2+32-1])
assert.ErrorContains(t, err, "failed to read target address")
}
// Note that PendingTransfer.Marshal can't fail, so there are no negative tests for that.
func TestUnmarshalPendingTransferFailures(t *testing.T) {
tokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
require.NoError(t, err)
msg := common.MessagePublication{
TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
Timestamp: time.Unix(int64(1654516425), 0),
Nonce: 123456,
Sequence: 789101112131415,
EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr,
Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
ConsistencyLevel: 16,
IsReobservation: true,
}
pending1 := &PendingTransfer{
ReleaseTime: time.Unix(int64(1654516425+72*60*60), 0),
Msg: msg,
}
bytes, err := pending1.Marshal()
require.NoError(t, err)
// First make sure regular unmarshal works.
pending2, err := UnmarshalPendingTransfer(bytes, false)
require.NoError(t, err)
assert.Equal(t, pending1, pending2)
// Truncate the release time.
_, err = UnmarshalPendingTransfer(bytes[0:4-1], false)
assert.ErrorContains(t, err, "failed to read pending transfer release time: ")
// The remainder is the marshaled message publication as a single buffer.
// Truncate the entire serialized message.
_, err = UnmarshalPendingTransfer(bytes[0:4], false)
assert.ErrorContains(t, err, "failed to read pending transfer msg")
// Truncate some of the serialized message.
_, err = UnmarshalPendingTransfer(bytes[0:len(bytes)-10], false)
assert.ErrorContains(t, err, "failed to unmarshal pending transfer msg")
}
func (d *Database) storeOldPendingMsg(t *testing.T, p *PendingTransfer) { func (d *Database) storeOldPendingMsg(t *testing.T, p *PendingTransfer) {
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
@ -439,39 +624,76 @@ func TestLoadingOldPendingTransfers(t *testing.T) {
defer db.Close() defer db.Close()
defer os.Remove(dbPath) defer os.Remove(dbPath)
tokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
require.NoError(t, err)
bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
require.NoError(t, err) require.NoError(t, err)
tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
require.NoError(t, err) require.NoError(t, err)
xfer1 := &Transfer{ oldXfer1 := &Transfer{
Timestamp: time.Unix(int64(1654516425), 0), Timestamp: time.Unix(int64(1654516425), 0),
Value: 125000, Value: 125000,
OriginChain: vaa.ChainIDEthereum, OriginChain: vaa.ChainIDEthereum,
OriginAddress: tokenAddr, OriginAddress: tokenAddr,
EmitterChain: vaa.ChainIDEthereum, EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr, EmitterAddress: ethereumTokenBridgeAddr,
MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415", // Don't set TargetChain or TargetAddress.
Hash: "Hash1", MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
Hash: "Hash1",
} }
err = db.StoreTransfer(xfer1) err = db.storeOldTransfer(oldXfer1)
require.Nil(t, err) require.NoError(t, err)
xfer2 := &Transfer{ newXfer1 := &Transfer{
Timestamp: time.Unix(int64(1654516430), 0), Timestamp: time.Unix(int64(1654516426), 0),
Value: 125000, Value: 125000,
OriginChain: vaa.ChainIDEthereum, OriginChain: vaa.ChainIDEthereum,
OriginAddress: tokenAddr, OriginAddress: tokenAddr,
EmitterChain: vaa.ChainIDEthereum, EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr, EmitterAddress: ethereumTokenBridgeAddr,
TargetChain: vaa.ChainIDBSC,
TargetAddress: bscTokenBridgeAddr,
MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131416", MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131416",
Hash: "Hash1",
}
err = db.StoreTransfer(newXfer1)
require.NoError(t, err)
oldXfer2 := &Transfer{
Timestamp: time.Unix(int64(1654516427), 0),
Value: 125000,
OriginChain: vaa.ChainIDEthereum,
OriginAddress: tokenAddr,
EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: ethereumTokenBridgeAddr,
// Don't set TargetChain or TargetAddress.
MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131417",
Hash: "Hash2",
}
err = db.storeOldTransfer(oldXfer2)
require.NoError(t, err)
newXfer2 := &Transfer{
Timestamp: time.Unix(int64(1654516428), 0),
Value: 125000,
OriginChain: vaa.ChainIDEthereum,
OriginAddress: tokenAddr,
EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: ethereumTokenBridgeAddr,
TargetChain: vaa.ChainIDBSC,
TargetAddress: bscTokenBridgeAddr,
MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131418",
Hash: "Hash2", Hash: "Hash2",
} }
err = db.StoreTransfer(xfer2) err = db.StoreTransfer(newXfer2)
require.Nil(t, err) require.NoError(t, err)
now := time.Unix(time.Now().Unix(), 0) now := time.Unix(time.Now().Unix(), 0)
@ -484,7 +706,7 @@ func TestLoadingOldPendingTransfers(t *testing.T) {
Nonce: 123456, Nonce: 123456,
Sequence: 789101112131417, Sequence: 789101112131417,
EmitterChain: vaa.ChainIDEthereum, EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr, EmitterAddress: ethereumTokenBridgeAddr,
Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
ConsistencyLevel: 16, ConsistencyLevel: 16,
// IsReobservation will not be serialized. It should be set to false on reload. // IsReobservation will not be serialized. It should be set to false on reload.
@ -492,7 +714,7 @@ func TestLoadingOldPendingTransfers(t *testing.T) {
} }
db.storeOldPendingMsg(t, pending1) db.storeOldPendingMsg(t, pending1)
require.Nil(t, err) require.NoError(t, err)
now2 := now.Add(time.Second * 5) now2 := now.Add(time.Second * 5)
@ -505,7 +727,7 @@ func TestLoadingOldPendingTransfers(t *testing.T) {
Nonce: 123456, Nonce: 123456,
Sequence: 789101112131418, Sequence: 789101112131418,
EmitterChain: vaa.ChainIDEthereum, EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr, EmitterAddress: ethereumTokenBridgeAddr,
Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
ConsistencyLevel: 16, ConsistencyLevel: 16,
IsReobservation: true, IsReobservation: true,
@ -513,22 +735,29 @@ func TestLoadingOldPendingTransfers(t *testing.T) {
} }
err = db.StorePendingMsg(pending2) err = db.StorePendingMsg(pending2)
require.Nil(t, err) require.NoError(t, err)
logger := zap.NewNop() logger := zap.NewNop()
xfers, pendings, err := db.GetChainGovernorDataForTime(logger, now) xfers, pendings, err := db.GetChainGovernorDataForTime(logger, now)
require.Nil(t, err) require.NoError(t, err)
require.Equal(t, 2, len(xfers)) require.Equal(t, 4, len(xfers))
require.Equal(t, 2, len(pendings)) require.Equal(t, 2, len(pendings))
sort.SliceStable(xfers, func(i, j int) bool {
return xfers[i].Timestamp.Before(xfers[j].Timestamp)
})
assert.Equal(t, oldXfer1, xfers[0])
assert.Equal(t, newXfer1, xfers[1])
assert.Equal(t, oldXfer2, xfers[2])
assert.Equal(t, newXfer2, xfers[3])
// Updated old pending events get placed at the end, so we need to sort into timestamp order. // Updated old pending events get placed at the end, so we need to sort into timestamp order.
sort.SliceStable(pendings, func(i, j int) bool { sort.SliceStable(pendings, func(i, j int) bool {
return pendings[i].Msg.Timestamp.Before(pendings[j].Msg.Timestamp) return pendings[i].Msg.Timestamp.Before(pendings[j].Msg.Timestamp)
}) })
assert.Equal(t, xfer1, xfers[0])
assert.Equal(t, xfer2, xfers[1])
assert.Equal(t, pending1.Msg, pendings[0].Msg) assert.Equal(t, pending1.Msg, pendings[0].Msg)
assert.Equal(t, pending2.Msg, pendings[1].Msg) assert.Equal(t, pending2.Msg, pendings[1].Msg)
@ -536,25 +765,40 @@ func TestLoadingOldPendingTransfers(t *testing.T) {
xfers2, pendings2, err := db.GetChainGovernorDataForTime(logger, now) xfers2, pendings2, err := db.GetChainGovernorDataForTime(logger, now)
require.Nil(t, err) require.NoError(t, err)
require.Equal(t, 2, len(xfers2)) require.Equal(t, 4, len(xfers2))
require.Equal(t, 2, len(pendings2)) require.Equal(t, 2, len(pendings2))
assert.Equal(t, xfer1, xfers2[0]) sort.SliceStable(xfers2, func(i, j int) bool {
assert.Equal(t, xfer2, xfers2[1]) return xfers2[i].Timestamp.Before(xfers2[j].Timestamp)
})
assert.Equal(t, oldXfer1, xfers2[0])
assert.Equal(t, newXfer1, xfers2[1])
assert.Equal(t, oldXfer2, xfers2[2])
assert.Equal(t, newXfer2, xfers2[3])
assert.Equal(t, pending1.Msg, pendings2[0].Msg) assert.Equal(t, pending1.Msg, pendings2[0].Msg)
assert.Equal(t, pending2.Msg, pendings2[1].Msg) assert.Equal(t, pending2.Msg, pendings2[1].Msg)
} }
func marshalOldTransfer(xfer *Transfer) []byte { func marshalOldTransfer(xfer *Transfer) []byte {
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
vaa.MustWrite(buf, binary.BigEndian, uint32(xfer.Timestamp.Unix())) vaa.MustWrite(buf, binary.BigEndian, uint32(xfer.Timestamp.Unix()))
vaa.MustWrite(buf, binary.BigEndian, xfer.Value) vaa.MustWrite(buf, binary.BigEndian, xfer.Value)
vaa.MustWrite(buf, binary.BigEndian, xfer.OriginChain) vaa.MustWrite(buf, binary.BigEndian, xfer.OriginChain)
buf.Write(xfer.OriginAddress[:]) buf.Write(xfer.OriginAddress[:])
vaa.MustWrite(buf, binary.BigEndian, xfer.EmitterChain) vaa.MustWrite(buf, binary.BigEndian, xfer.EmitterChain)
buf.Write(xfer.EmitterAddress[:]) buf.Write(xfer.EmitterAddress[:])
buf.Write([]byte(xfer.MsgID)) vaa.MustWrite(buf, binary.BigEndian, uint16(len(xfer.MsgID)))
if len(xfer.MsgID) > 0 {
buf.Write([]byte(xfer.MsgID))
}
vaa.MustWrite(buf, binary.BigEndian, uint16(len(xfer.Hash)))
if len(xfer.Hash) > 0 {
buf.Write([]byte(xfer.Hash))
}
return buf.Bytes() return buf.Bytes()
} }
@ -574,7 +818,7 @@ func TestDeserializeOfOldTransfer(t *testing.T) {
tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
require.NoError(t, err) require.NoError(t, err)
tokenBridgeAddr, _ := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
require.NoError(t, err) require.NoError(t, err)
xfer1 := &Transfer{ xfer1 := &Transfer{
@ -583,9 +827,10 @@ func TestDeserializeOfOldTransfer(t *testing.T) {
OriginChain: vaa.ChainIDEthereum, OriginChain: vaa.ChainIDEthereum,
OriginAddress: tokenAddr, OriginAddress: tokenAddr,
EmitterChain: vaa.ChainIDEthereum, EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr, EmitterAddress: ethereumTokenBridgeAddr,
MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415", // Don't set TargetChain or TargetAddress.
// Do not set the Hash. MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
Hash: "Hash1",
} }
bytes := marshalOldTransfer(xfer1) bytes := marshalOldTransfer(xfer1)
@ -595,7 +840,7 @@ func TestDeserializeOfOldTransfer(t *testing.T) {
assert.Equal(t, xfer1, xfer2) assert.Equal(t, xfer1, xfer2)
expectedTransferKey := "GOV:XFER2:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415" expectedTransferKey := "GOV:XFER3:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
assert.Equal(t, expectedTransferKey, string(TransferMsgID(xfer2))) assert.Equal(t, expectedTransferKey, string(TransferMsgID(xfer2)))
} }
@ -608,7 +853,10 @@ func TestOldTransfersUpdatedWhenReloading(t *testing.T) {
defer db.Close() defer db.Close()
defer os.Remove(dbPath) defer os.Remove(dbPath)
tokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
require.NoError(t, err)
bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
require.NoError(t, err) require.NoError(t, err)
tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
@ -621,8 +869,9 @@ func TestOldTransfersUpdatedWhenReloading(t *testing.T) {
OriginChain: vaa.ChainIDEthereum, OriginChain: vaa.ChainIDEthereum,
OriginAddress: tokenAddr, OriginAddress: tokenAddr,
EmitterChain: vaa.ChainIDEthereum, EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr, EmitterAddress: ethereumTokenBridgeAddr,
MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415", // Don't set TargetChain or TargetAddress.
MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
// Do not set the Hash. // Do not set the Hash.
} }
@ -636,7 +885,9 @@ func TestOldTransfersUpdatedWhenReloading(t *testing.T) {
OriginChain: vaa.ChainIDEthereum, OriginChain: vaa.ChainIDEthereum,
OriginAddress: tokenAddr, OriginAddress: tokenAddr,
EmitterChain: vaa.ChainIDEthereum, EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr, EmitterAddress: ethereumTokenBridgeAddr,
TargetChain: vaa.ChainIDBSC,
TargetAddress: bscTokenBridgeAddr,
MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131416", MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131416",
Hash: "Hash2", Hash: "Hash2",
} }

View File

@ -427,6 +427,8 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
OriginAddress: token.token.addr, OriginAddress: token.token.addr,
EmitterChain: msg.EmitterChain, EmitterChain: msg.EmitterChain,
EmitterAddress: msg.EmitterAddress, EmitterAddress: msg.EmitterAddress,
TargetChain: payload.TargetChain,
TargetAddress: payload.TargetAddress,
MsgID: msg.MessageIDString(), MsgID: msg.MessageIDString(),
Hash: hash, Hash: hash,
} }
@ -579,29 +581,41 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
zap.String("msgID", pe.dbData.Msg.MessageIDString())) zap.String("msgID", pe.dbData.Msg.MessageIDString()))
} }
// If we get here, publish it and remove it from the pending list. payload, err := vaa.DecodeTransferPayloadHdr(pe.dbData.Msg.Payload)
msgsToPublish = append(msgsToPublish, &pe.dbData.Msg) if err != nil {
gov.logger.Error("failed to decode payload for pending VAA, dropping it",
if countsTowardsTransfers { zap.String("msgID", pe.dbData.Msg.MessageIDString()),
xfer := db.Transfer{Timestamp: now, zap.String("hash", pe.hash),
Value: value, zap.Error(err),
OriginChain: pe.token.token.chain, )
OriginAddress: pe.token.token.addr, delete(gov.msgsSeen, pe.hash) // Rest of the clean up happens below.
EmitterChain: pe.dbData.Msg.EmitterChain,
EmitterAddress: pe.dbData.Msg.EmitterAddress,
MsgID: pe.dbData.Msg.MessageIDString(),
Hash: pe.hash,
}
if err := gov.db.StoreTransfer(&xfer); err != nil {
gov.msgsToPublish = msgsToPublish
return nil, err
}
ce.transfers = append(ce.transfers, &xfer)
gov.msgsSeen[pe.hash] = transferComplete
} else { } else {
delete(gov.msgsSeen, pe.hash) // If we get here, publish it and remove it from the pending list.
msgsToPublish = append(msgsToPublish, &pe.dbData.Msg)
if countsTowardsTransfers {
xfer := db.Transfer{Timestamp: now,
Value: value,
OriginChain: pe.token.token.chain,
OriginAddress: pe.token.token.addr,
EmitterChain: pe.dbData.Msg.EmitterChain,
EmitterAddress: pe.dbData.Msg.EmitterAddress,
TargetChain: payload.TargetChain,
TargetAddress: payload.TargetAddress,
MsgID: pe.dbData.Msg.MessageIDString(),
Hash: pe.hash,
}
if err := gov.db.StoreTransfer(&xfer); err != nil {
gov.msgsToPublish = msgsToPublish
return nil, err
}
ce.transfers = append(ce.transfers, &xfer)
gov.msgsSeen[pe.hash] = transferComplete
} else {
delete(gov.msgsSeen, pe.hash)
}
} }
if err := gov.db.DeletePendingMsg(&pe.dbData); err != nil { if err := gov.db.DeletePendingMsg(&pe.dbData); err != nil {

View File

@ -20,6 +20,9 @@ import (
"github.com/certusone/wormhole/node/pkg/db" "github.com/certusone/wormhole/node/pkg/db"
"github.com/wormhole-foundation/wormhole/sdk/vaa" "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
"go.uber.org/zap/zaptest/observer"
) )
// This is so we can have consistent config data for unit tests. // This is so we can have consistent config data for unit tests.
@ -113,6 +116,18 @@ func (gov *ChainGovernor) getStatsForAllChains() (numTrans int, valueTrans uint6
return return
} }
func checkTargetOnReleasedIsSet(t *testing.T, toBePublished []*common.MessagePublication, targetChain vaa.ChainID, targetAddressStr string) {
require.NotEqual(t, 0, len(toBePublished))
toAddr, err := vaa.StringToAddress(targetAddressStr)
require.NoError(t, err)
for _, msg := range toBePublished {
payload, err := vaa.DecodeTransferPayloadHdr(msg.Payload)
require.NoError(t, err)
assert.Equal(t, targetChain, payload.TargetChain)
assert.Equal(t, toAddr, payload.TargetAddress)
}
}
func TestTrimEmptyTransfers(t *testing.T) { func TestTrimEmptyTransfers(t *testing.T) {
ctx := context.Background() ctx := context.Background()
gov, err := newChainGovernorForTest(ctx) gov, err := newChainGovernorForTest(ctx)
@ -244,11 +259,14 @@ func TestTrimmingAllTransfersShouldReturnZero(t *testing.T) {
} }
func newChainGovernorForTest(ctx context.Context) (*ChainGovernor, error) { func newChainGovernorForTest(ctx context.Context) (*ChainGovernor, error) {
return newChainGovernorForTestWithLogger(ctx, zap.NewNop())
}
func newChainGovernorForTestWithLogger(ctx context.Context, logger *zap.Logger) (*ChainGovernor, error) {
if ctx == nil { if ctx == nil {
return nil, fmt.Errorf("ctx is nil") return nil, fmt.Errorf("ctx is nil")
} }
logger := zap.NewNop()
var db db.MockGovernorDB var db db.MockGovernorDB
gov := NewChainGovernor(logger, &db, common.GoTest) gov := NewChainGovernor(logger, &db, common.GoTest)
@ -784,6 +802,7 @@ func TestPendingTransferBeingReleased(t *testing.T) {
toBePublished, err = gov.CheckPendingForTime(now) toBePublished, err = gov.CheckPendingForTime(now)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, 1, len(toBePublished)) assert.Equal(t, 1, len(toBePublished))
checkTargetOnReleasedIsSet(t, toBePublished, vaa.ChainIDPolygon, toAddrStr)
numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains() numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
require.NoError(t, err) require.NoError(t, err)
@ -1011,6 +1030,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) {
toBePublished, err = gov.CheckPendingForTime(now) toBePublished, err = gov.CheckPendingForTime(now)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, 2, len(toBePublished)) assert.Equal(t, 2, len(toBePublished))
checkTargetOnReleasedIsSet(t, toBePublished, vaa.ChainIDPolygon, toAddrStr)
numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains() numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
require.NoError(t, err) require.NoError(t, err)
@ -1234,6 +1254,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
toBePublished, err = gov.CheckPendingForTime(now) toBePublished, err = gov.CheckPendingForTime(now)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, 1, len(toBePublished)) assert.Equal(t, 1, len(toBePublished))
checkTargetOnReleasedIsSet(t, toBePublished, vaa.ChainIDPolygon, toAddrStr)
numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains() numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
require.NoError(t, err) require.NoError(t, err)
@ -1324,6 +1345,7 @@ func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) {
toBePublished, err = gov.CheckPendingForTime(now) toBePublished, err = gov.CheckPendingForTime(now)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, 1, len(toBePublished)) assert.Equal(t, 1, len(toBePublished))
checkTargetOnReleasedIsSet(t, toBePublished, vaa.ChainIDPolygon, toAddrStr)
numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains() numTrans, valueTrans, numPending, valuePending = gov.getStatsForAllChains()
assert.Equal(t, false, canPost) assert.Equal(t, false, canPost)
@ -1792,3 +1814,125 @@ func TestCoinGeckoQueries(t *testing.T) {
}) })
} }
} }
// setupLogsCapture is a helper function for making a zap logger/observer combination for testing that certain logs have been made
func setupLogsCapture(t testing.TB, options ...zap.Option) (*zap.Logger, *observer.ObservedLogs) {
t.Helper()
observedCore, observedLogs := observer.New(zap.InfoLevel)
consoleLogger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))
parentLogger := zap.New(zapcore.NewTee(observedCore, consoleLogger.Core()), options...)
return parentLogger, observedLogs
}
func TestPendingTransferWithBadPayloadGetsDroppedNotReleased(t *testing.T) {
ctx := context.Background()
zapLogger, zapObserver := setupLogsCapture(t)
gov, err := newChainGovernorForTestWithLogger(ctx, zapLogger)
require.NoError(t, err)
require.NotNil(t, gov)
tokenAddrStr := "0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E" //nolint:gosec
toAddrStr := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8"
tokenBridgeAddrStr := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
tokenBridgeAddr, err := vaa.StringToAddress(tokenBridgeAddrStr)
require.NoError(t, err)
gov.setDayLengthInMinutes(24 * 60)
err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStr, 10000, 100000)
require.NoError(t, err)
err = gov.setTokenForTesting(vaa.ChainIDEthereum, tokenAddrStr, "WETH", 1774.62)
require.NoError(t, err)
// Create two big transactions.
msg1 := common.MessagePublication{
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
Timestamp: time.Unix(int64(1654543099), 0),
Nonce: uint32(1),
Sequence: uint64(1),
EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr,
ConsistencyLevel: uint8(32),
Payload: buildMockTransferPayloadBytes(1,
vaa.ChainIDEthereum,
tokenAddrStr,
vaa.ChainIDPolygon,
toAddrStr,
5000,
),
}
msg2 := common.MessagePublication{
TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"),
Timestamp: time.Unix(int64(1654543099), 0),
Nonce: uint32(2),
Sequence: uint64(2),
EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddr,
ConsistencyLevel: uint8(32),
Payload: buildMockTransferPayloadBytes(1,
vaa.ChainIDEthereum,
tokenAddrStr,
vaa.ChainIDPolygon,
toAddrStr,
5000,
),
}
// Post the two big transfers and and verify they get enqueued.
now, _ := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)")
canPost, err := gov.ProcessMsgForTime(&msg1, now)
require.NoError(t, err)
assert.Equal(t, false, canPost)
canPost, err = gov.ProcessMsgForTime(&msg2, now)
require.NoError(t, err)
assert.Equal(t, false, canPost)
numTrans, _, numPending, _ := gov.getStatsForAllChains()
assert.Equal(t, 2, len(gov.msgsSeen))
assert.Equal(t, 0, numTrans)
assert.Equal(t, 2, numPending)
// Corrupt the payload of msg2 so that when we try to release it, it will get dropped.
gov.mutex.Lock()
ce, exists := gov.chains[vaa.ChainIDEthereum]
require.True(t, exists)
require.Equal(t, 2, len(ce.pending))
require.Equal(t, "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/2", ce.pending[1].dbData.Msg.MessageIDString())
ce.pending[1].dbData.Msg.Payload = nil
gov.mutex.Unlock()
// After 24hrs, msg1 should get released but msg2 should get dropped.
now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 12:01pm (CST)")
toBePublished, err := gov.CheckPendingForTime(now)
require.NoError(t, err)
assert.Equal(t, 1, len(toBePublished))
checkTargetOnReleasedIsSet(t, toBePublished, vaa.ChainIDPolygon, toAddrStr)
assert.Equal(t, "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/1", toBePublished[0].MessageIDString())
// Verify that we got the expected error in the logs.
loggedEntries := zapObserver.FilterMessage("failed to decode payload for pending VAA, dropping it").All()
require.Equal(t, 1, len(loggedEntries))
foundIt := false
for _, f := range loggedEntries[0].Context {
if f.Key == "msgID" && f.String == "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/2" {
foundIt = true
}
}
assert.True(t, foundIt)
// Verify that the message is no longer pending.
gov.mutex.Lock()
ce, exists = gov.chains[vaa.ChainIDEthereum]
require.True(t, exists)
assert.Equal(t, 0, len(ce.pending))
gov.mutex.Unlock()
// Neither one should be in the map of messages seen.
_, exists = gov.msgsSeen[gov.HashFromMsg(&msg1)]
assert.False(t, exists)
_, exists = gov.msgsSeen[gov.HashFromMsg(&msg2)]
assert.False(t, exists)
}