Add raw_message field

This commit is contained in:
Kirill Fedoseev 2022-06-22 18:42:23 +03:00
parent 9684e9bf2c
commit fc6baff634
13 changed files with 161 additions and 93 deletions

View File

@ -0,0 +1,4 @@
ALTER TABLE messages
DROP COLUMN raw_message;
ALTER TABLE erc_to_native_messages
DROP COLUMN raw_message;

View File

@ -0,0 +1,62 @@
ALTER TABLE messages
ADD COLUMN raw_message BYTEA;
ALTER TABLE erc_to_native_messages
ADD COLUMN raw_message BYTEA;
UPDATE messages m
SET raw_message = l.transaction_hash || substr(l.data, 65, get_byte(l.data, 62) * 256 + get_byte(l.data, 63))
FROM sent_messages sm
JOIN logs l
ON l.id = sm.log_id
WHERE m.bridge_id = sm.bridge_id
AND m.msg_hash = sm.msg_hash
AND l.topic0 IN (
E'\\x9df71e9d2175354c68d0e882702c9eb5d63e036345c53639b8c63be4e8764741',
E'\\x733b62005ae93e850dd2b37234e1c7eb634d3b7de068bc0f7f32b7233191a48c'
);
UPDATE messages m
SET raw_message = substr(l.data, 65, get_byte(l.data, 62) * 256 + get_byte(l.data, 63))
FROM sent_messages sm
JOIN logs l
ON l.id = sm.log_id
WHERE m.bridge_id = sm.bridge_id
AND m.msg_hash = sm.msg_hash
AND l.topic0 IN (
E'\\x482515ce3d9494a37ce83f18b72b363449458435fafdd7a53ddea7460fe01b58',
E'\\x520d2afde79cbd5db58755ac9480f81bc658e5c517fcae7365a3d832590b0183'
);
UPDATE erc_to_native_messages m
SET raw_message = substr(l.data, 13) || l.transaction_hash || E'\\x4aa42145Aa6Ebf72e164C9bBC74fbD3788045016'
FROM sent_messages sm
JOIN logs l
ON l.id = sm.log_id
WHERE m.bridge_id = sm.bridge_id
AND m.msg_hash = sm.msg_hash
AND m.direction::direction_enum = 'home_to_foreign';
UPDATE erc_to_native_messages m
SET raw_message = substr(l.data, 13) || l.transaction_hash
FROM sent_messages sm
JOIN logs l
ON l.id = sm.log_id
WHERE m.bridge_id = sm.bridge_id
AND m.msg_hash = sm.msg_hash
AND m.direction::direction_enum = 'foreign_to_home'
AND l.topic0 = E'\\x1d491a427d1f8cc0d447496f300fac39f7306122481d8e663451eb268274146b';
UPDATE erc_to_native_messages m
SET raw_message = substr(l.topic1, 13) || l.data || l.transaction_hash
FROM sent_messages sm
JOIN logs l
ON l.id = sm.log_id
WHERE m.bridge_id = sm.bridge_id
AND m.msg_hash = sm.msg_hash
AND m.direction::direction_enum = 'foreign_to_home'
AND l.topic0 = E'\\xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef';
ALTER TABLE messages
ALTER COLUMN raw_message TYPE BLOB;
ALTER TABLE erc_to_native_messages
ALTER COLUMN raw_message TYPE BLOB;

View File

@ -8,15 +8,16 @@ import (
) )
type ErcToNativeMessage struct { type ErcToNativeMessage struct {
ID uint `db:"id"` ID uint `db:"id"`
BridgeID string `db:"bridge_id"` BridgeID string `db:"bridge_id"`
MsgHash common.Hash `db:"msg_hash"` MsgHash common.Hash `db:"msg_hash"`
Direction Direction `db:"direction"` Direction Direction `db:"direction"`
Sender common.Address `db:"sender"` Sender common.Address `db:"sender"`
Receiver common.Address `db:"receiver"` Receiver common.Address `db:"receiver"`
Value string `db:"value"` Value string `db:"value"`
CreatedAt *time.Time `db:"created_at"` RawMessage []byte `db:"raw_message"`
UpdatedAt *time.Time `db:"updated_at"` CreatedAt *time.Time `db:"created_at"`
UpdatedAt *time.Time `db:"updated_at"`
} }
type ErcToNativeMessagesRepo interface { type ErcToNativeMessagesRepo interface {

View File

@ -30,10 +30,10 @@ type LogsFilter struct {
FromBlock *uint FromBlock *uint
ToBlock *uint ToBlock *uint
TxHash *common.Hash TxHash *common.Hash
Topic0 *common.Hash Topic0 []common.Hash
Topic1 *common.Hash Topic1 []common.Hash
Topic2 *common.Hash Topic2 []common.Hash
Topic3 *common.Hash Topic3 []common.Hash
DataLength *uint DataLength *uint
} }

View File

@ -15,18 +15,19 @@ const (
) )
type Message struct { type Message struct {
ID uint `db:"id"` ID uint `db:"id"`
BridgeID string `db:"bridge_id"` BridgeID string `db:"bridge_id"`
MsgHash common.Hash `db:"msg_hash"` MsgHash common.Hash `db:"msg_hash"`
MessageID common.Hash `db:"message_id"` MessageID common.Hash `db:"message_id"`
Direction Direction `db:"direction"` Direction Direction `db:"direction"`
Sender common.Address `db:"sender"` Sender common.Address `db:"sender"`
Executor common.Address `db:"executor"` Executor common.Address `db:"executor"`
Data []byte `db:"data"` Data []byte `db:"data"`
DataType uint `db:"data_type"` DataType uint `db:"data_type"`
GasLimit uint `db:"gas_limit"` GasLimit uint `db:"gas_limit"`
CreatedAt *time.Time `db:"created_at"` RawMessage []byte `db:"raw_message"`
UpdatedAt *time.Time `db:"updated_at"` CreatedAt *time.Time `db:"created_at"`
UpdatedAt *time.Time `db:"updated_at"`
} }
type MessagesRepo interface { type MessagesRepo interface {

View File

@ -19,6 +19,6 @@ type SignedMessage struct {
type SignedMessagesRepo interface { type SignedMessagesRepo interface {
Ensure(ctx context.Context, msg *SignedMessage) error Ensure(ctx context.Context, msg *SignedMessage) error
GetByLogID(ctx context.Context, logID uint) (*SignedMessage, error) GetByLogID(ctx context.Context, logID uint) (*SignedMessage, error)
FindByMsgHash(ctx context.Context, bridgeID string, msgHash common.Hash) ([]*SignedMessage, error) FindByMsgHash(ctx context.Context, bridgeID string, msgHash []common.Hash) ([]*SignedMessage, error)
GetLatest(ctx context.Context, bridgeID, chainID string, signer common.Address) (*SignedMessage, error) GetLatest(ctx context.Context, bridgeID, chainID string, signer common.Address) (*SignedMessage, error)
} }

View File

@ -14,28 +14,30 @@ func unmarshalMessage(bridgeID string, direction entity.Direction, encodedData [
messageID := common.BytesToHash(encodedData[:32]) messageID := common.BytesToHash(encodedData[:32])
if bytes.Equal(messageID[0:4], []byte{0, 4, 0, 0}) { if bytes.Equal(messageID[0:4], []byte{0, 4, 0, 0}) {
return &entity.Message{ return &entity.Message{
BridgeID: bridgeID, BridgeID: bridgeID,
Direction: direction, Direction: direction,
MsgHash: crypto.Keccak256Hash(encodedData), MsgHash: crypto.Keccak256Hash(encodedData),
MessageID: messageID, MessageID: messageID,
Sender: common.BytesToAddress(encodedData[64:84]), Sender: common.BytesToAddress(encodedData[64:84]),
Executor: common.BytesToAddress(encodedData[84:104]), Executor: common.BytesToAddress(encodedData[84:104]),
GasLimit: uint(binary.BigEndian.Uint32(encodedData[104:108])), GasLimit: uint(binary.BigEndian.Uint32(encodedData[104:108])),
DataType: uint(encodedData[108]), DataType: uint(encodedData[108]),
Data: encodedData[108:], Data: encodedData[108:],
RawMessage: encodedData,
} }
} }
if bytes.Equal(messageID[0:4], []byte{0, 5, 0, 0}) { if bytes.Equal(messageID[0:4], []byte{0, 5, 0, 0}) {
return &entity.Message{ return &entity.Message{
BridgeID: bridgeID, BridgeID: bridgeID,
Direction: direction, Direction: direction,
MsgHash: crypto.Keccak256Hash(encodedData), MsgHash: crypto.Keccak256Hash(encodedData),
MessageID: messageID, MessageID: messageID,
Sender: common.BytesToAddress(encodedData[32:52]), Sender: common.BytesToAddress(encodedData[32:52]),
Executor: common.BytesToAddress(encodedData[52:72]), Executor: common.BytesToAddress(encodedData[52:72]),
GasLimit: uint(binary.BigEndian.Uint32(encodedData[72:76])), GasLimit: uint(binary.BigEndian.Uint32(encodedData[72:76])),
DataType: uint(encodedData[78]), DataType: uint(encodedData[78]),
Data: encodedData[79+encodedData[76]+encodedData[77]:], Data: encodedData[79+encodedData[76]+encodedData[77]:],
RawMessage: encodedData,
} }
} }
panic("unsupported message version prefix") panic("unsupported message version prefix")
@ -48,15 +50,16 @@ func unmarshalLegacyMessage(bridgeID string, direction entity.Direction, encoded
} }
return &entity.Message{ return &entity.Message{
BridgeID: bridgeID, BridgeID: bridgeID,
Direction: direction, Direction: direction,
MsgHash: crypto.Keccak256Hash(encodedData), MsgHash: crypto.Keccak256Hash(encodedData),
MessageID: common.BytesToHash(encodedData[:32]), MessageID: common.BytesToHash(encodedData[:32]),
Sender: common.BytesToAddress(encodedData[32:52]), Sender: common.BytesToAddress(encodedData[32:52]),
Executor: common.BytesToAddress(encodedData[52:72]), Executor: common.BytesToAddress(encodedData[52:72]),
GasLimit: uint(binary.BigEndian.Uint32(encodedData[100:104])), GasLimit: uint(binary.BigEndian.Uint32(encodedData[100:104])),
DataType: 0, DataType: 0,
Data: encodedData[105:], Data: encodedData[105:],
RawMessage: encodedData,
} }
} }

View File

@ -95,7 +95,7 @@ func (p *BridgeEventHandler) HandleErcToNativeTransfer(ctx context.Context, log
FromBlock: &log.BlockNumber, FromBlock: &log.BlockNumber,
ToBlock: &log.BlockNumber, ToBlock: &log.BlockNumber,
TxHash: &log.TransactionHash, TxHash: &log.TransactionHash,
Topic0: &bridgeabi.ErcToNativeUserRequestForAffirmationEventSignature, Topic0: []common.Hash{bridgeabi.ErcToNativeUserRequestForAffirmationEventSignature},
} }
logs, err := p.repo.Logs.Find(ctx, filter) logs, err := p.repo.Logs.Find(ctx, filter)
if err != nil { if err != nil {
@ -112,12 +112,13 @@ func (p *BridgeEventHandler) HandleErcToNativeTransfer(ctx context.Context, log
msgHash := crypto.Keccak256Hash(msg) msgHash := crypto.Keccak256Hash(msg)
message := &entity.ErcToNativeMessage{ message := &entity.ErcToNativeMessage{
BridgeID: p.bridgeID, BridgeID: p.bridgeID,
Direction: entity.DirectionForeignToHome, Direction: entity.DirectionForeignToHome,
MsgHash: msgHash, MsgHash: msgHash,
Sender: from, Sender: from,
Receiver: from, Receiver: from,
Value: value.String(), Value: value.String(),
RawMessage: msg,
} }
err = p.repo.ErcToNativeMessages.Ensure(ctx, message) err = p.repo.ErcToNativeMessages.Ensure(ctx, message)
if err != nil { if err != nil {
@ -152,8 +153,8 @@ func (p *BridgeEventHandler) HandleErcToNativeUserRequestForAffirmation(ctx cont
FromBlock: &log.BlockNumber, FromBlock: &log.BlockNumber,
ToBlock: &log.BlockNumber, ToBlock: &log.BlockNumber,
TxHash: &log.TransactionHash, TxHash: &log.TransactionHash,
Topic0: &bridgeabi.ErcToNativeTransferEventSignature, Topic0: []common.Hash{bridgeabi.ErcToNativeTransferEventSignature},
Topic2: hashPtr(p.cfg.Foreign.Address.Hash()), Topic2: []common.Hash{p.cfg.Foreign.Address.Hash()},
DataLength: uintPtr(32), DataLength: uintPtr(32),
} }
logs, err := p.repo.Logs.Find(ctx, filter) logs, err := p.repo.Logs.Find(ctx, filter)
@ -169,12 +170,13 @@ func (p *BridgeEventHandler) HandleErcToNativeUserRequestForAffirmation(ctx cont
} }
message := &entity.ErcToNativeMessage{ message := &entity.ErcToNativeMessage{
BridgeID: p.bridgeID, BridgeID: p.bridgeID,
Direction: entity.DirectionForeignToHome, Direction: entity.DirectionForeignToHome,
MsgHash: msgHash, MsgHash: msgHash,
Sender: sender, Sender: sender,
Receiver: recipient, Receiver: recipient,
Value: value.String(), Value: value.String(),
RawMessage: msg,
} }
err = p.repo.ErcToNativeMessages.Ensure(ctx, message) err = p.repo.ErcToNativeMessages.Ensure(ctx, message)
if err != nil { if err != nil {
@ -252,12 +254,13 @@ func (p *BridgeEventHandler) HandleErcToNativeUserRequestForSignature(ctx contex
} }
message := &entity.ErcToNativeMessage{ message := &entity.ErcToNativeMessage{
BridgeID: p.bridgeID, BridgeID: p.bridgeID,
Direction: entity.DirectionHomeToForeign, Direction: entity.DirectionHomeToForeign,
MsgHash: msgHash, MsgHash: msgHash,
Sender: sender, Sender: sender,
Receiver: recipient, Receiver: recipient,
Value: value.String(), Value: value.String(),
RawMessage: msg,
} }
err = p.repo.ErcToNativeMessages.Ensure(ctx, message) err = p.repo.ErcToNativeMessages.Ensure(ctx, message)
if err != nil { if err != nil {

View File

@ -1,11 +1,5 @@
package monitor package monitor
import "github.com/ethereum/go-ethereum/common"
func uintPtr(v uint) *uint { func uintPtr(v uint) *uint {
return &v return &v
} }
func hashPtr(v common.Hash) *common.Hash {
return &v
}

View File

@ -19,8 +19,8 @@ func NewErcToNativeMessagesRepo(table string, db *db.DB) entity.ErcToNativeMessa
func (r *ercToNativeMessagesRepo) Ensure(ctx context.Context, msg *entity.ErcToNativeMessage) error { func (r *ercToNativeMessagesRepo) Ensure(ctx context.Context, msg *entity.ErcToNativeMessage) error {
q, args, err := sq.Insert(r.table). q, args, err := sq.Insert(r.table).
Columns("bridge_id", "msg_hash", "direction", "sender", "receiver", "value"). Columns("bridge_id", "msg_hash", "direction", "sender", "receiver", "value", "raw_message").
Values(msg.BridgeID, msg.MsgHash, msg.Direction, msg.Sender, msg.Receiver, msg.Value). Values(msg.BridgeID, msg.MsgHash, msg.Direction, msg.Sender, msg.Receiver, msg.Value, msg.RawMessage).
Suffix("ON CONFLICT (bridge_id, msg_hash) DO UPDATE SET sender = EXCLUDED.sender, updated_at = NOW()"). Suffix("ON CONFLICT (bridge_id, msg_hash) DO UPDATE SET sender = EXCLUDED.sender, updated_at = NOW()").
PlaceholderFormat(sq.Dollar). PlaceholderFormat(sq.Dollar).
ToSql() ToSql()

View File

@ -82,17 +82,17 @@ func (r *logsRepo) Find(ctx context.Context, filter entity.LogsFilter) ([]*entit
if filter.TxHash != nil { if filter.TxHash != nil {
cond = append(cond, sq.Eq{"transaction_hash": *filter.TxHash}) cond = append(cond, sq.Eq{"transaction_hash": *filter.TxHash})
} }
if filter.Topic0 != nil { if len(filter.Topic0) > 0 {
cond = append(cond, sq.Eq{"topic0": *filter.Topic0}) cond = append(cond, sq.Eq{"topic0": filter.Topic0})
} }
if filter.Topic1 != nil { if len(filter.Topic1) > 0 {
cond = append(cond, sq.Eq{"topic1": *filter.Topic1}) cond = append(cond, sq.Eq{"topic1": filter.Topic1})
} }
if filter.Topic2 != nil { if len(filter.Topic2) > 0 {
cond = append(cond, sq.Eq{"topic2": *filter.Topic2}) cond = append(cond, sq.Eq{"topic2": filter.Topic2})
} }
if filter.Topic3 != nil { if len(filter.Topic3) > 0 {
cond = append(cond, sq.Eq{"topic3": *filter.Topic3}) cond = append(cond, sq.Eq{"topic3": filter.Topic3})
} }
if filter.DataLength != nil { if filter.DataLength != nil {
cond = append(cond, sq.Eq{"length(data)": *filter.DataLength}) cond = append(cond, sq.Eq{"length(data)": *filter.DataLength})

View File

@ -19,8 +19,8 @@ func NewMessagesRepo(table string, db *db.DB) entity.MessagesRepo {
func (r *messagesRepo) Ensure(ctx context.Context, msg *entity.Message) error { func (r *messagesRepo) Ensure(ctx context.Context, msg *entity.Message) error {
q, args, err := sq.Insert(r.table). q, args, err := sq.Insert(r.table).
Columns("bridge_id", "msg_hash", "message_id", "direction", "sender", "executor", "data", "data_type", "gas_limit"). Columns("bridge_id", "msg_hash", "message_id", "direction", "sender", "executor", "data", "data_type", "gas_limit", "raw_message").
Values(msg.BridgeID, msg.MsgHash, msg.MessageID, msg.Direction, msg.Sender, msg.Executor, msg.Data, msg.DataType, msg.GasLimit). Values(msg.BridgeID, msg.MsgHash, msg.MessageID, msg.Direction, msg.Sender, msg.Executor, msg.Data, msg.DataType, msg.GasLimit, msg.RawMessage).
Suffix("ON CONFLICT (bridge_id, msg_hash) DO UPDATE SET updated_at = NOW()"). Suffix("ON CONFLICT (bridge_id, msg_hash) DO UPDATE SET updated_at = NOW()").
PlaceholderFormat(sq.Dollar). PlaceholderFormat(sq.Dollar).
ToSql() ToSql()

View File

@ -51,7 +51,7 @@ func (r *signedMessagesRepo) GetByLogID(ctx context.Context, logID uint) (*entit
return msg, nil return msg, nil
} }
func (r *signedMessagesRepo) FindByMsgHash(ctx context.Context, bridgeID string, msgHash common.Hash) ([]*entity.SignedMessage, error) { func (r *signedMessagesRepo) FindByMsgHash(ctx context.Context, bridgeID string, msgHash []common.Hash) ([]*entity.SignedMessage, error) {
q, args, err := sq.Select("*"). q, args, err := sq.Select("*").
From(r.table). From(r.table).
Where(sq.Eq{"bridge_id": bridgeID, "msg_hash": msgHash}). Where(sq.Eq{"bridge_id": bridgeID, "msg_hash": msgHash}).