diff --git a/config.schema.json b/config.schema.json index cc5d1d8..704fbfc 100644 --- a/config.schema.json +++ b/config.schema.json @@ -237,9 +237,6 @@ "type": "integer" } }, - "required": [ - "event" - ], "additionalProperties": false } }, diff --git a/db/migrations/000005_erc_to_native_add_sender.down.sql b/db/migrations/000005_erc_to_native_add_sender.down.sql new file mode 100644 index 0000000..ccfffe6 --- /dev/null +++ b/db/migrations/000005_erc_to_native_add_sender.down.sql @@ -0,0 +1 @@ +ALTER TABLE erc_to_native_messages DROP COLUMN sender; diff --git a/db/migrations/000005_erc_to_native_add_sender.up.sql b/db/migrations/000005_erc_to_native_add_sender.up.sql new file mode 100644 index 0000000..b432b44 --- /dev/null +++ b/db/migrations/000005_erc_to_native_add_sender.up.sql @@ -0,0 +1,23 @@ +ALTER TABLE erc_to_native_messages + ADD COLUMN sender ADDRESS DEFAULT '\x0000000000000000000000000000000000000000'::ADDRESS; +UPDATE erc_to_native_messages m +SET m.sender = m.receiver +FROM sent_messages sm, + logs l +WHERE m.bridge_id = sm.bridge_id + AND m.msg_hash = sm.msg_hash + AND l.id = sm.log_id + AND l.topic0 = '\xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'::OPT_WORD; +SET m.sender = substring(l2.topic2 from 12 for 20)::ADDRESS +FROM sent_messages sm, + logs l, + logs l2 +WHERE m.bridge_id = sm.bridge_id + AND m.msg_hash = sm.msg_hash + AND l.id = sm.log_id + AND l.topic0 = '\x127650bcfb0ba017401abe4931453a405140a8fd36fece67bae2db174d3fdd63'::OPT_WORD + AND l2.transaction_hash = l.transaction_hash + AND l2.topic0 = '\xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'::OPT_WORD; + +ALTER TABLE erc_to_native_messages + ALTER COLUMN sender DROP DEFAULT; \ No newline at end of file diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index d74a8ea..e021689 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -24,6 +24,13 @@ services: - ./grafana/provisioning:/etc/grafana/provisioning - ./grafana/dashboards:/etc/grafana/dashboards - grafana-storage:/var/lib/grafana + environment: + PG_HOST: 'postgres:5432' + PG_USER: 'postgres' + PG_PASSWORD: 'pass' + PG_DB: 'db' + PROM_USER: 'admin' + PROM_HOST: 'http://prometheus:9090' ports: - "3000:3000" prometheus: diff --git a/entity/erc_to_native_message.go b/entity/erc_to_native_message.go index d3337cb..7a08637 100644 --- a/entity/erc_to_native_message.go +++ b/entity/erc_to_native_message.go @@ -12,6 +12,7 @@ type ErcToNativeMessage struct { BridgeID string `db:"bridge_id"` MsgHash common.Hash `db:"msg_hash"` Direction Direction `db:"direction"` + Sender common.Address `db:"sender"` Receiver common.Address `db:"receiver"` Value string `db:"value"` CreatedAt *time.Time `db:"created_at"` diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index d3425f1..513be61 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -20,6 +20,7 @@ type Client struct { timeout time.Duration rawClient *rpc.Client client *ethclient.Client + signer types.Signer } func NewClient(url string, timeout time.Duration, chainID string) (*Client, error) { @@ -46,6 +47,7 @@ func NewClient(url string, timeout time.Duration, chainID string) (*Client, erro if rpcChainID.String() != chainID { return nil, fmt.Errorf("rpc url retunrned different chainID, expected %s, got %s", chainID, rpcChainID) } + client.signer = types.NewLondonSigner(rpcChainID) return client, nil } @@ -155,6 +157,10 @@ func (c *Client) CallContract(ctx context.Context, msg ethereum.CallMsg) ([]byte return res, err } +func (c *Client) TransactionSender(tx *types.Transaction) (common.Address, error) { + return c.signer.Sender(tx) +} + func toFilterArg(q ethereum.FilterQuery) (interface{}, error) { arg := map[string]interface{}{ "address": q.Addresses, diff --git a/grafana/dashboards/xdai.json b/grafana/dashboards/xdai.json index 168255e..93525af 100644 --- a/grafana/dashboards/xdai.json +++ b/grafana/dashboards/xdai.json @@ -21,7 +21,7 @@ "editable": true, "gnetId": null, "graphTooltip": 0, - "iteration": 1649577198271, + "iteration": 1649851949807, "links": [], "panels": [ { @@ -733,7 +733,7 @@ }, { "id": "custom.width", - "value": 351 + "value": 389 } ] }, @@ -745,7 +745,7 @@ "properties": [ { "id": "custom.width", - "value": 241 + "value": null } ] }, @@ -757,7 +757,7 @@ "properties": [ { "id": "custom.width", - "value": 201 + "value": null }, { "id": "unit", @@ -773,13 +773,41 @@ "properties": [ { "id": "custom.width", - "value": 237 + "value": null }, { "id": "unit", "value": "string" } ] + }, + { + "matcher": { + "id": "byName", + "options": "sender" + }, + "properties": [ + { + "id": "unit", + "value": "string" + }, + { + "id": "custom.width", + "value": 357 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "direction" + }, + "properties": [ + { + "id": "custom.width", + "value": 151 + } + ] } ] }, @@ -801,7 +829,7 @@ "group": [], "metricColumn": "none", "rawQuery": true, - "rawSql": "SELECT m.direction, concat('0x', encode(m.receiver, 'hex')) as receiver, sum(m.value) / 1e18 as total, avg(m.value) / 1e18 as average, count(*)\nFROM erc_to_native_messages m\nWHERE m.bridge_id = '$bridge' GROUP BY m.direction, m.receiver\nHAVING count(*) >= 10\nORDER BY count(*) DESC\n", + "rawSql": "SELECT m.direction, concat('0x', encode(m.sender, 'hex')) as sender, concat('0x', encode(m.receiver, 'hex')) as receiver, sum(m.value) / 1e18 as total, avg(m.value) / 1e18 as average, count(*)\nFROM erc_to_native_messages m\nWHERE m.bridge_id = '$bridge' GROUP BY m.direction, m.sender, m.receiver\nHAVING count(*) >= 10\nORDER BY count(*) DESC\n", "refId": "A", "select": [ [ @@ -1331,5 +1359,5 @@ "timezone": "", "title": "XDAI", "uid": "h48F4hIa2", - "version": 1 + "version": 9 } \ No newline at end of file diff --git a/monitor/handlers.go b/monitor/handlers.go index 52dd43d..5a77b51 100644 --- a/monitor/handlers.go +++ b/monitor/handlers.go @@ -72,12 +72,12 @@ func (p *BridgeEventHandler) HandleErcToNativeTransfer(ctx context.Context, log return nil } } - receipt, err := p.foreignClient.TransactionReceiptByHash(ctx, log.TransactionHash) + logs, err := p.repo.Logs.FindByTxHash(ctx, log.TransactionHash) if err != nil { - return fmt.Errorf("failed to get transaction receipt by hash %s: %w", log.TransactionHash, err) + return fmt.Errorf("failed to get transaction logs for %s: %w", log.TransactionHash, err) } - for _, l := range receipt.Logs { - if len(l.Topics) > 0 && l.Topics[0] == abi.ERC_TO_NATIVE.Events["UserRequestForAffirmation"].ID { + for _, l := range logs { + if l.Topic0 != nil && *l.Topic0 == abi.ERC_TO_NATIVE.Events["UserRequestForAffirmation"].ID { return nil } } @@ -92,6 +92,7 @@ func (p *BridgeEventHandler) HandleErcToNativeTransfer(ctx context.Context, log BridgeID: p.bridgeID, Direction: entity.DirectionForeignToHome, MsgHash: msgHash, + Sender: from, Receiver: from, Value: value.String(), } @@ -116,14 +117,35 @@ func (p *BridgeEventHandler) HandleErcToNativeUserRequestForAffirmation(ctx cont msg = append(msg, log.TransactionHash[:]...) msgHash := crypto.Keccak256Hash(msg) + logs, err := p.repo.Logs.FindByTxHash(ctx, log.TransactionHash) + if err != nil { + return fmt.Errorf("failed to get transaction logs for %s: %w", log.TransactionHash, err) + } + var sender common.Address + for _, l := range logs { + if l.Topic0 != nil && *l.Topic0 == abi.ERC_TO_NATIVE.Events["Transfer"].ID && l.Topic1 != nil && l.Topic2 != nil && len(l.Data) == 32 { + transferSender := common.BytesToAddress(l.Topic1[:]) + transferReceiver := common.BytesToAddress(l.Topic2[:]) + transferValue := new(big.Int).SetBytes(l.Data) + if transferReceiver == p.cfg.Foreign.Address && value.Cmp(transferValue) == 0 { + for _, t := range p.cfg.Foreign.ErcToNativeTokens { + if l.Address == t.Address && l.BlockNumber >= t.StartBlock && (t.EndBlock == 0 || l.BlockNumber <= t.EndBlock) { + sender = transferSender + } + } + } + } + } + message := &entity.ErcToNativeMessage{ BridgeID: p.bridgeID, Direction: entity.DirectionForeignToHome, MsgHash: msgHash, + Sender: sender, Receiver: recipient, Value: value.String(), } - err := p.repo.ErcToNativeMessages.Ensure(ctx, message) + err = p.repo.ErcToNativeMessages.Ensure(ctx, message) if err != nil { return err } @@ -174,14 +196,27 @@ func (p *BridgeEventHandler) HandleErcToNativeUserRequestForSignature(ctx contex msg = append(msg, p.cfg.Foreign.Address[:]...) msgHash := crypto.Keccak256Hash(msg) + sender := recipient + tx, err := p.homeClient.TransactionByHash(ctx, log.TransactionHash) + if err != nil { + return err + } + if tx.Value().Cmp(value) == 0 { + sender, err = p.homeClient.TransactionSender(tx) + if err != nil { + return fmt.Errorf("failed to extract transaction sender: %w", err) + } + } + message := &entity.ErcToNativeMessage{ BridgeID: p.bridgeID, Direction: entity.DirectionHomeToForeign, MsgHash: msgHash, + Sender: sender, Receiver: recipient, Value: value.String(), } - err := p.repo.ErcToNativeMessages.Ensure(ctx, message) + err = p.repo.ErcToNativeMessages.Ensure(ctx, message) if err != nil { return err } diff --git a/monitor/monitor.go b/monitor/monitor.go index 920a017..564e3d1 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -288,8 +288,6 @@ func (m *ContractMonitor) StartBlockFetcher(ctx context.Context, start uint) { func (m *ContractMonitor) RefetchEvents(lastFetchedBlock uint) { m.logger.Info("refetching old events") for _, job := range m.cfg.RefetchEvents { - topic := crypto.Keccak256Hash([]byte(job.Event)) - fromBlock := job.StartBlock if fromBlock < m.cfg.StartBlock { fromBlock = m.cfg.StartBlock @@ -308,11 +306,15 @@ func (m *ContractMonitor) RefetchEvents(lastFetchedBlock uint) { "from_block": fromBlock, "to_block": end, }).Info("scheduling new block range logs search") - m.blocksRangeChan <- &BlocksRange{ - From: fromBlock, - To: end, - Topic: &topic, + br := &BlocksRange{ + From: fromBlock, + To: end, } + if job.Event != "" { + topic := crypto.Keccak256Hash([]byte(job.Event)) + br.Topic = &topic + } + m.blocksRangeChan <- br fromBlock = end + 1 } } diff --git a/repository/postgres/erc_to_native_messages.go b/repository/postgres/erc_to_native_messages.go index e86639e..52c131b 100644 --- a/repository/postgres/erc_to_native_messages.go +++ b/repository/postgres/erc_to_native_messages.go @@ -20,9 +20,9 @@ func NewErcToNativeMessagesRepo(table string, db *db.DB) entity.ErcToNativeMessa func (r *ercToNativeMessagesRepo) Ensure(ctx context.Context, msg *entity.ErcToNativeMessage) error { q, args, err := sq.Insert(r.table). - Columns("bridge_id", "msg_hash", "direction", "receiver", "value"). - Values(msg.BridgeID, msg.MsgHash, msg.Direction, msg.Receiver, msg.Value). - Suffix("ON CONFLICT (bridge_id, msg_hash) DO UPDATE SET updated_at = NOW()"). + Columns("bridge_id", "msg_hash", "direction", "sender", "receiver", "value"). + Values(msg.BridgeID, msg.MsgHash, msg.Direction, msg.Sender, msg.Receiver, msg.Value). + Suffix("ON CONFLICT (bridge_id, msg_hash) DO UPDATE SET sender = EXCLUDED.sender, updated_at = NOW()"). PlaceholderFormat(sq.Dollar). ToSql() if err != nil {