diff --git a/db/migrations/000007_add_raw_message_field.down.sql b/db/migrations/000007_add_raw_message_field.down.sql new file mode 100644 index 0000000..b61e4a4 --- /dev/null +++ b/db/migrations/000007_add_raw_message_field.down.sql @@ -0,0 +1,4 @@ +ALTER TABLE messages + DROP COLUMN raw_message; +ALTER TABLE erc_to_native_messages + DROP COLUMN raw_message; diff --git a/db/migrations/000007_add_raw_message_field.up.sql b/db/migrations/000007_add_raw_message_field.up.sql new file mode 100644 index 0000000..98fb33b --- /dev/null +++ b/db/migrations/000007_add_raw_message_field.up.sql @@ -0,0 +1,62 @@ +ALTER TABLE messages + ADD COLUMN raw_message BYTEA; +ALTER TABLE erc_to_native_messages + ADD COLUMN raw_message BYTEA; + +UPDATE messages m +SET raw_message = l.transaction_hash || substr(l.data, 65, get_byte(l.data, 62) * 256 + get_byte(l.data, 63)) +FROM sent_messages sm + JOIN logs l + ON l.id = sm.log_id +WHERE m.bridge_id = sm.bridge_id + AND m.msg_hash = sm.msg_hash + AND l.topic0 IN ( + E'\\x9df71e9d2175354c68d0e882702c9eb5d63e036345c53639b8c63be4e8764741', + E'\\x733b62005ae93e850dd2b37234e1c7eb634d3b7de068bc0f7f32b7233191a48c' + ); + +UPDATE messages m +SET raw_message = substr(l.data, 65, get_byte(l.data, 62) * 256 + get_byte(l.data, 63)) +FROM sent_messages sm + JOIN logs l + ON l.id = sm.log_id +WHERE m.bridge_id = sm.bridge_id + AND m.msg_hash = sm.msg_hash + AND l.topic0 IN ( + E'\\x482515ce3d9494a37ce83f18b72b363449458435fafdd7a53ddea7460fe01b58', + E'\\x520d2afde79cbd5db58755ac9480f81bc658e5c517fcae7365a3d832590b0183' + ); + +UPDATE erc_to_native_messages m +SET raw_message = substr(l.data, 13) || l.transaction_hash || E'\\x4aa42145Aa6Ebf72e164C9bBC74fbD3788045016' +FROM sent_messages sm + JOIN logs l + ON l.id = sm.log_id +WHERE m.bridge_id = sm.bridge_id + AND m.msg_hash = sm.msg_hash + AND m.direction::direction_enum = 'home_to_foreign'; + +UPDATE erc_to_native_messages m +SET raw_message = substr(l.data, 13) || l.transaction_hash +FROM sent_messages sm + JOIN logs l + ON l.id = sm.log_id +WHERE m.bridge_id = sm.bridge_id + AND m.msg_hash = sm.msg_hash + AND m.direction::direction_enum = 'foreign_to_home' + AND l.topic0 = E'\\x1d491a427d1f8cc0d447496f300fac39f7306122481d8e663451eb268274146b'; + +UPDATE erc_to_native_messages m +SET raw_message = substr(l.topic1, 13) || l.data || l.transaction_hash +FROM sent_messages sm + JOIN logs l + ON l.id = sm.log_id +WHERE m.bridge_id = sm.bridge_id + AND m.msg_hash = sm.msg_hash + AND m.direction::direction_enum = 'foreign_to_home' + AND l.topic0 = E'\\xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'; + +ALTER TABLE messages + ALTER COLUMN raw_message TYPE BLOB; +ALTER TABLE erc_to_native_messages + ALTER COLUMN raw_message TYPE BLOB; diff --git a/entity/erc_to_native_message.go b/entity/erc_to_native_message.go index a93a5cd..944cca7 100644 --- a/entity/erc_to_native_message.go +++ b/entity/erc_to_native_message.go @@ -8,15 +8,16 @@ import ( ) type ErcToNativeMessage struct { - ID uint `db:"id"` - BridgeID string `db:"bridge_id"` - MsgHash common.Hash `db:"msg_hash"` - Direction Direction `db:"direction"` - Sender common.Address `db:"sender"` - Receiver common.Address `db:"receiver"` - Value string `db:"value"` - CreatedAt *time.Time `db:"created_at"` - UpdatedAt *time.Time `db:"updated_at"` + ID uint `db:"id"` + BridgeID string `db:"bridge_id"` + MsgHash common.Hash `db:"msg_hash"` + Direction Direction `db:"direction"` + Sender common.Address `db:"sender"` + Receiver common.Address `db:"receiver"` + Value string `db:"value"` + RawMessage []byte `db:"raw_message"` + CreatedAt *time.Time `db:"created_at"` + UpdatedAt *time.Time `db:"updated_at"` } type ErcToNativeMessagesRepo interface { diff --git a/entity/log.go b/entity/log.go index bb3e79b..4649108 100644 --- a/entity/log.go +++ b/entity/log.go @@ -30,10 +30,10 @@ type LogsFilter struct { FromBlock *uint ToBlock *uint TxHash *common.Hash - Topic0 *common.Hash - Topic1 *common.Hash - Topic2 *common.Hash - Topic3 *common.Hash + Topic0 []common.Hash + Topic1 []common.Hash + Topic2 []common.Hash + Topic3 []common.Hash DataLength *uint } diff --git a/entity/message.go b/entity/message.go index 2f37091..3ee81d4 100644 --- a/entity/message.go +++ b/entity/message.go @@ -15,18 +15,19 @@ const ( ) type Message struct { - ID uint `db:"id"` - BridgeID string `db:"bridge_id"` - MsgHash common.Hash `db:"msg_hash"` - MessageID common.Hash `db:"message_id"` - Direction Direction `db:"direction"` - Sender common.Address `db:"sender"` - Executor common.Address `db:"executor"` - Data []byte `db:"data"` - DataType uint `db:"data_type"` - GasLimit uint `db:"gas_limit"` - CreatedAt *time.Time `db:"created_at"` - UpdatedAt *time.Time `db:"updated_at"` + ID uint `db:"id"` + BridgeID string `db:"bridge_id"` + MsgHash common.Hash `db:"msg_hash"` + MessageID common.Hash `db:"message_id"` + Direction Direction `db:"direction"` + Sender common.Address `db:"sender"` + Executor common.Address `db:"executor"` + Data []byte `db:"data"` + DataType uint `db:"data_type"` + GasLimit uint `db:"gas_limit"` + RawMessage []byte `db:"raw_message"` + CreatedAt *time.Time `db:"created_at"` + UpdatedAt *time.Time `db:"updated_at"` } type MessagesRepo interface { diff --git a/entity/signed_message.go b/entity/signed_message.go index 7948ae6..66fb20e 100644 --- a/entity/signed_message.go +++ b/entity/signed_message.go @@ -19,6 +19,6 @@ type SignedMessage struct { type SignedMessagesRepo interface { Ensure(ctx context.Context, msg *SignedMessage) error GetByLogID(ctx context.Context, logID uint) (*SignedMessage, error) - FindByMsgHash(ctx context.Context, bridgeID string, msgHash common.Hash) ([]*SignedMessage, error) + FindByMsgHash(ctx context.Context, bridgeID string, msgHash []common.Hash) ([]*SignedMessage, error) GetLatest(ctx context.Context, bridgeID, chainID string, signer common.Address) (*SignedMessage, error) } diff --git a/monitor/decoders.go b/monitor/decoders.go index 92273f0..44d04e6 100644 --- a/monitor/decoders.go +++ b/monitor/decoders.go @@ -14,28 +14,30 @@ func unmarshalMessage(bridgeID string, direction entity.Direction, encodedData [ messageID := common.BytesToHash(encodedData[:32]) if bytes.Equal(messageID[0:4], []byte{0, 4, 0, 0}) { return &entity.Message{ - BridgeID: bridgeID, - Direction: direction, - MsgHash: crypto.Keccak256Hash(encodedData), - MessageID: messageID, - Sender: common.BytesToAddress(encodedData[64:84]), - Executor: common.BytesToAddress(encodedData[84:104]), - GasLimit: uint(binary.BigEndian.Uint32(encodedData[104:108])), - DataType: uint(encodedData[108]), - Data: encodedData[108:], + BridgeID: bridgeID, + Direction: direction, + MsgHash: crypto.Keccak256Hash(encodedData), + MessageID: messageID, + Sender: common.BytesToAddress(encodedData[64:84]), + Executor: common.BytesToAddress(encodedData[84:104]), + GasLimit: uint(binary.BigEndian.Uint32(encodedData[104:108])), + DataType: uint(encodedData[108]), + Data: encodedData[108:], + RawMessage: encodedData, } } if bytes.Equal(messageID[0:4], []byte{0, 5, 0, 0}) { return &entity.Message{ - BridgeID: bridgeID, - Direction: direction, - MsgHash: crypto.Keccak256Hash(encodedData), - MessageID: messageID, - Sender: common.BytesToAddress(encodedData[32:52]), - Executor: common.BytesToAddress(encodedData[52:72]), - GasLimit: uint(binary.BigEndian.Uint32(encodedData[72:76])), - DataType: uint(encodedData[78]), - Data: encodedData[79+encodedData[76]+encodedData[77]:], + BridgeID: bridgeID, + Direction: direction, + MsgHash: crypto.Keccak256Hash(encodedData), + MessageID: messageID, + Sender: common.BytesToAddress(encodedData[32:52]), + Executor: common.BytesToAddress(encodedData[52:72]), + GasLimit: uint(binary.BigEndian.Uint32(encodedData[72:76])), + DataType: uint(encodedData[78]), + Data: encodedData[79+encodedData[76]+encodedData[77]:], + RawMessage: encodedData, } } panic("unsupported message version prefix") @@ -48,15 +50,16 @@ func unmarshalLegacyMessage(bridgeID string, direction entity.Direction, encoded } return &entity.Message{ - BridgeID: bridgeID, - Direction: direction, - MsgHash: crypto.Keccak256Hash(encodedData), - MessageID: common.BytesToHash(encodedData[:32]), - Sender: common.BytesToAddress(encodedData[32:52]), - Executor: common.BytesToAddress(encodedData[52:72]), - GasLimit: uint(binary.BigEndian.Uint32(encodedData[100:104])), - DataType: 0, - Data: encodedData[105:], + BridgeID: bridgeID, + Direction: direction, + MsgHash: crypto.Keccak256Hash(encodedData), + MessageID: common.BytesToHash(encodedData[:32]), + Sender: common.BytesToAddress(encodedData[32:52]), + Executor: common.BytesToAddress(encodedData[52:72]), + GasLimit: uint(binary.BigEndian.Uint32(encodedData[100:104])), + DataType: 0, + Data: encodedData[105:], + RawMessage: encodedData, } } diff --git a/monitor/handlers.go b/monitor/handlers.go index 6ee2d80..0e0694e 100644 --- a/monitor/handlers.go +++ b/monitor/handlers.go @@ -95,7 +95,7 @@ func (p *BridgeEventHandler) HandleErcToNativeTransfer(ctx context.Context, log FromBlock: &log.BlockNumber, ToBlock: &log.BlockNumber, TxHash: &log.TransactionHash, - Topic0: &bridgeabi.ErcToNativeUserRequestForAffirmationEventSignature, + Topic0: []common.Hash{bridgeabi.ErcToNativeUserRequestForAffirmationEventSignature}, } logs, err := p.repo.Logs.Find(ctx, filter) if err != nil { @@ -112,12 +112,13 @@ func (p *BridgeEventHandler) HandleErcToNativeTransfer(ctx context.Context, log msgHash := crypto.Keccak256Hash(msg) message := &entity.ErcToNativeMessage{ - BridgeID: p.bridgeID, - Direction: entity.DirectionForeignToHome, - MsgHash: msgHash, - Sender: from, - Receiver: from, - Value: value.String(), + BridgeID: p.bridgeID, + Direction: entity.DirectionForeignToHome, + MsgHash: msgHash, + Sender: from, + Receiver: from, + Value: value.String(), + RawMessage: msg, } err = p.repo.ErcToNativeMessages.Ensure(ctx, message) if err != nil { @@ -152,8 +153,8 @@ func (p *BridgeEventHandler) HandleErcToNativeUserRequestForAffirmation(ctx cont FromBlock: &log.BlockNumber, ToBlock: &log.BlockNumber, TxHash: &log.TransactionHash, - Topic0: &bridgeabi.ErcToNativeTransferEventSignature, - Topic2: hashPtr(p.cfg.Foreign.Address.Hash()), + Topic0: []common.Hash{bridgeabi.ErcToNativeTransferEventSignature}, + Topic2: []common.Hash{p.cfg.Foreign.Address.Hash()}, DataLength: uintPtr(32), } logs, err := p.repo.Logs.Find(ctx, filter) @@ -169,12 +170,13 @@ func (p *BridgeEventHandler) HandleErcToNativeUserRequestForAffirmation(ctx cont } message := &entity.ErcToNativeMessage{ - BridgeID: p.bridgeID, - Direction: entity.DirectionForeignToHome, - MsgHash: msgHash, - Sender: sender, - Receiver: recipient, - Value: value.String(), + BridgeID: p.bridgeID, + Direction: entity.DirectionForeignToHome, + MsgHash: msgHash, + Sender: sender, + Receiver: recipient, + Value: value.String(), + RawMessage: msg, } err = p.repo.ErcToNativeMessages.Ensure(ctx, message) if err != nil { @@ -252,12 +254,13 @@ func (p *BridgeEventHandler) HandleErcToNativeUserRequestForSignature(ctx contex } message := &entity.ErcToNativeMessage{ - BridgeID: p.bridgeID, - Direction: entity.DirectionHomeToForeign, - MsgHash: msgHash, - Sender: sender, - Receiver: recipient, - Value: value.String(), + BridgeID: p.bridgeID, + Direction: entity.DirectionHomeToForeign, + MsgHash: msgHash, + Sender: sender, + Receiver: recipient, + Value: value.String(), + RawMessage: msg, } err = p.repo.ErcToNativeMessages.Ensure(ctx, message) if err != nil { diff --git a/monitor/utils.go b/monitor/utils.go index 438af0b..967c749 100644 --- a/monitor/utils.go +++ b/monitor/utils.go @@ -1,11 +1,5 @@ package monitor -import "github.com/ethereum/go-ethereum/common" - func uintPtr(v uint) *uint { return &v } - -func hashPtr(v common.Hash) *common.Hash { - return &v -} diff --git a/repository/postgres/erc_to_native_messages.go b/repository/postgres/erc_to_native_messages.go index e766657..057388c 100644 --- a/repository/postgres/erc_to_native_messages.go +++ b/repository/postgres/erc_to_native_messages.go @@ -19,8 +19,8 @@ func NewErcToNativeMessagesRepo(table string, db *db.DB) entity.ErcToNativeMessa func (r *ercToNativeMessagesRepo) Ensure(ctx context.Context, msg *entity.ErcToNativeMessage) error { q, args, err := sq.Insert(r.table). - Columns("bridge_id", "msg_hash", "direction", "sender", "receiver", "value"). - Values(msg.BridgeID, msg.MsgHash, msg.Direction, msg.Sender, msg.Receiver, msg.Value). + Columns("bridge_id", "msg_hash", "direction", "sender", "receiver", "value", "raw_message"). + Values(msg.BridgeID, msg.MsgHash, msg.Direction, msg.Sender, msg.Receiver, msg.Value, msg.RawMessage). Suffix("ON CONFLICT (bridge_id, msg_hash) DO UPDATE SET sender = EXCLUDED.sender, updated_at = NOW()"). PlaceholderFormat(sq.Dollar). ToSql() diff --git a/repository/postgres/logs.go b/repository/postgres/logs.go index 201e839..a5f7523 100644 --- a/repository/postgres/logs.go +++ b/repository/postgres/logs.go @@ -82,17 +82,17 @@ func (r *logsRepo) Find(ctx context.Context, filter entity.LogsFilter) ([]*entit if filter.TxHash != nil { cond = append(cond, sq.Eq{"transaction_hash": *filter.TxHash}) } - if filter.Topic0 != nil { - cond = append(cond, sq.Eq{"topic0": *filter.Topic0}) + if len(filter.Topic0) > 0 { + cond = append(cond, sq.Eq{"topic0": filter.Topic0}) } - if filter.Topic1 != nil { - cond = append(cond, sq.Eq{"topic1": *filter.Topic1}) + if len(filter.Topic1) > 0 { + cond = append(cond, sq.Eq{"topic1": filter.Topic1}) } - if filter.Topic2 != nil { - cond = append(cond, sq.Eq{"topic2": *filter.Topic2}) + if len(filter.Topic2) > 0 { + cond = append(cond, sq.Eq{"topic2": filter.Topic2}) } - if filter.Topic3 != nil { - cond = append(cond, sq.Eq{"topic3": *filter.Topic3}) + if len(filter.Topic3) > 0 { + cond = append(cond, sq.Eq{"topic3": filter.Topic3}) } if filter.DataLength != nil { cond = append(cond, sq.Eq{"length(data)": *filter.DataLength}) diff --git a/repository/postgres/messages.go b/repository/postgres/messages.go index b62621c..ecbba09 100644 --- a/repository/postgres/messages.go +++ b/repository/postgres/messages.go @@ -19,8 +19,8 @@ func NewMessagesRepo(table string, db *db.DB) entity.MessagesRepo { func (r *messagesRepo) Ensure(ctx context.Context, msg *entity.Message) error { q, args, err := sq.Insert(r.table). - Columns("bridge_id", "msg_hash", "message_id", "direction", "sender", "executor", "data", "data_type", "gas_limit"). - Values(msg.BridgeID, msg.MsgHash, msg.MessageID, msg.Direction, msg.Sender, msg.Executor, msg.Data, msg.DataType, msg.GasLimit). + Columns("bridge_id", "msg_hash", "message_id", "direction", "sender", "executor", "data", "data_type", "gas_limit", "raw_message"). + Values(msg.BridgeID, msg.MsgHash, msg.MessageID, msg.Direction, msg.Sender, msg.Executor, msg.Data, msg.DataType, msg.GasLimit, msg.RawMessage). Suffix("ON CONFLICT (bridge_id, msg_hash) DO UPDATE SET updated_at = NOW()"). PlaceholderFormat(sq.Dollar). ToSql() diff --git a/repository/postgres/signed_messages.go b/repository/postgres/signed_messages.go index 764dc16..d153ec6 100644 --- a/repository/postgres/signed_messages.go +++ b/repository/postgres/signed_messages.go @@ -51,7 +51,7 @@ func (r *signedMessagesRepo) GetByLogID(ctx context.Context, logID uint) (*entit return msg, nil } -func (r *signedMessagesRepo) FindByMsgHash(ctx context.Context, bridgeID string, msgHash common.Hash) ([]*entity.SignedMessage, error) { +func (r *signedMessagesRepo) FindByMsgHash(ctx context.Context, bridgeID string, msgHash []common.Hash) ([]*entity.SignedMessage, error) { q, args, err := sq.Select("*"). From(r.table). Where(sq.Eq{"bridge_id": bridgeID, "msg_hash": msgHash}).