Add erc_to_native sender

This commit is contained in:
Kirill Fedoseev 2022-04-13 16:47:28 +04:00
parent c392391646
commit eaa3d7ba19
10 changed files with 125 additions and 25 deletions

View File

@ -237,9 +237,6 @@
"type": "integer"
}
},
"required": [
"event"
],
"additionalProperties": false
}
},

View File

@ -0,0 +1 @@
ALTER TABLE erc_to_native_messages DROP COLUMN sender;

View File

@ -0,0 +1,23 @@
ALTER TABLE erc_to_native_messages
ADD COLUMN sender ADDRESS DEFAULT '\x0000000000000000000000000000000000000000'::ADDRESS;
UPDATE erc_to_native_messages m
SET m.sender = m.receiver
FROM sent_messages sm,
logs l
WHERE m.bridge_id = sm.bridge_id
AND m.msg_hash = sm.msg_hash
AND l.id = sm.log_id
AND l.topic0 = '\xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'::OPT_WORD;
SET m.sender = substring(l2.topic2 from 12 for 20)::ADDRESS
FROM sent_messages sm,
logs l,
logs l2
WHERE m.bridge_id = sm.bridge_id
AND m.msg_hash = sm.msg_hash
AND l.id = sm.log_id
AND l.topic0 = '\x127650bcfb0ba017401abe4931453a405140a8fd36fece67bae2db174d3fdd63'::OPT_WORD
AND l2.transaction_hash = l.transaction_hash
AND l2.topic0 = '\xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'::OPT_WORD;
ALTER TABLE erc_to_native_messages
ALTER COLUMN sender DROP DEFAULT;

View File

@ -24,6 +24,13 @@ services:
- ./grafana/provisioning:/etc/grafana/provisioning
- ./grafana/dashboards:/etc/grafana/dashboards
- grafana-storage:/var/lib/grafana
environment:
PG_HOST: 'postgres:5432'
PG_USER: 'postgres'
PG_PASSWORD: 'pass'
PG_DB: 'db'
PROM_USER: 'admin'
PROM_HOST: 'http://prometheus:9090'
ports:
- "3000:3000"
prometheus:

View File

@ -12,6 +12,7 @@ type ErcToNativeMessage struct {
BridgeID string `db:"bridge_id"`
MsgHash common.Hash `db:"msg_hash"`
Direction Direction `db:"direction"`
Sender common.Address `db:"sender"`
Receiver common.Address `db:"receiver"`
Value string `db:"value"`
CreatedAt *time.Time `db:"created_at"`

View File

@ -20,6 +20,7 @@ type Client struct {
timeout time.Duration
rawClient *rpc.Client
client *ethclient.Client
signer types.Signer
}
func NewClient(url string, timeout time.Duration, chainID string) (*Client, error) {
@ -46,6 +47,7 @@ func NewClient(url string, timeout time.Duration, chainID string) (*Client, erro
if rpcChainID.String() != chainID {
return nil, fmt.Errorf("rpc url retunrned different chainID, expected %s, got %s", chainID, rpcChainID)
}
client.signer = types.NewLondonSigner(rpcChainID)
return client, nil
}
@ -155,6 +157,10 @@ func (c *Client) CallContract(ctx context.Context, msg ethereum.CallMsg) ([]byte
return res, err
}
func (c *Client) TransactionSender(tx *types.Transaction) (common.Address, error) {
return c.signer.Sender(tx)
}
func toFilterArg(q ethereum.FilterQuery) (interface{}, error) {
arg := map[string]interface{}{
"address": q.Addresses,

View File

@ -21,7 +21,7 @@
"editable": true,
"gnetId": null,
"graphTooltip": 0,
"iteration": 1649577198271,
"iteration": 1649851949807,
"links": [],
"panels": [
{
@ -733,7 +733,7 @@
},
{
"id": "custom.width",
"value": 351
"value": 389
}
]
},
@ -745,7 +745,7 @@
"properties": [
{
"id": "custom.width",
"value": 241
"value": null
}
]
},
@ -757,7 +757,7 @@
"properties": [
{
"id": "custom.width",
"value": 201
"value": null
},
{
"id": "unit",
@ -773,13 +773,41 @@
"properties": [
{
"id": "custom.width",
"value": 237
"value": null
},
{
"id": "unit",
"value": "string"
}
]
},
{
"matcher": {
"id": "byName",
"options": "sender"
},
"properties": [
{
"id": "unit",
"value": "string"
},
{
"id": "custom.width",
"value": 357
}
]
},
{
"matcher": {
"id": "byName",
"options": "direction"
},
"properties": [
{
"id": "custom.width",
"value": 151
}
]
}
]
},
@ -801,7 +829,7 @@
"group": [],
"metricColumn": "none",
"rawQuery": true,
"rawSql": "SELECT m.direction, concat('0x', encode(m.receiver, 'hex')) as receiver, sum(m.value) / 1e18 as total, avg(m.value) / 1e18 as average, count(*)\nFROM erc_to_native_messages m\nWHERE m.bridge_id = '$bridge' GROUP BY m.direction, m.receiver\nHAVING count(*) >= 10\nORDER BY count(*) DESC\n",
"rawSql": "SELECT m.direction, concat('0x', encode(m.sender, 'hex')) as sender, concat('0x', encode(m.receiver, 'hex')) as receiver, sum(m.value) / 1e18 as total, avg(m.value) / 1e18 as average, count(*)\nFROM erc_to_native_messages m\nWHERE m.bridge_id = '$bridge' GROUP BY m.direction, m.sender, m.receiver\nHAVING count(*) >= 10\nORDER BY count(*) DESC\n",
"refId": "A",
"select": [
[
@ -1331,5 +1359,5 @@
"timezone": "",
"title": "XDAI",
"uid": "h48F4hIa2",
"version": 1
"version": 9
}

View File

@ -72,12 +72,12 @@ func (p *BridgeEventHandler) HandleErcToNativeTransfer(ctx context.Context, log
return nil
}
}
receipt, err := p.foreignClient.TransactionReceiptByHash(ctx, log.TransactionHash)
logs, err := p.repo.Logs.FindByTxHash(ctx, log.TransactionHash)
if err != nil {
return fmt.Errorf("failed to get transaction receipt by hash %s: %w", log.TransactionHash, err)
return fmt.Errorf("failed to get transaction logs for %s: %w", log.TransactionHash, err)
}
for _, l := range receipt.Logs {
if len(l.Topics) > 0 && l.Topics[0] == abi.ERC_TO_NATIVE.Events["UserRequestForAffirmation"].ID {
for _, l := range logs {
if l.Topic0 != nil && *l.Topic0 == abi.ERC_TO_NATIVE.Events["UserRequestForAffirmation"].ID {
return nil
}
}
@ -92,6 +92,7 @@ func (p *BridgeEventHandler) HandleErcToNativeTransfer(ctx context.Context, log
BridgeID: p.bridgeID,
Direction: entity.DirectionForeignToHome,
MsgHash: msgHash,
Sender: from,
Receiver: from,
Value: value.String(),
}
@ -116,14 +117,35 @@ func (p *BridgeEventHandler) HandleErcToNativeUserRequestForAffirmation(ctx cont
msg = append(msg, log.TransactionHash[:]...)
msgHash := crypto.Keccak256Hash(msg)
logs, err := p.repo.Logs.FindByTxHash(ctx, log.TransactionHash)
if err != nil {
return fmt.Errorf("failed to get transaction logs for %s: %w", log.TransactionHash, err)
}
var sender common.Address
for _, l := range logs {
if l.Topic0 != nil && *l.Topic0 == abi.ERC_TO_NATIVE.Events["Transfer"].ID && l.Topic1 != nil && l.Topic2 != nil && len(l.Data) == 32 {
transferSender := common.BytesToAddress(l.Topic1[:])
transferReceiver := common.BytesToAddress(l.Topic2[:])
transferValue := new(big.Int).SetBytes(l.Data)
if transferReceiver == p.cfg.Foreign.Address && value.Cmp(transferValue) == 0 {
for _, t := range p.cfg.Foreign.ErcToNativeTokens {
if l.Address == t.Address && l.BlockNumber >= t.StartBlock && (t.EndBlock == 0 || l.BlockNumber <= t.EndBlock) {
sender = transferSender
}
}
}
}
}
message := &entity.ErcToNativeMessage{
BridgeID: p.bridgeID,
Direction: entity.DirectionForeignToHome,
MsgHash: msgHash,
Sender: sender,
Receiver: recipient,
Value: value.String(),
}
err := p.repo.ErcToNativeMessages.Ensure(ctx, message)
err = p.repo.ErcToNativeMessages.Ensure(ctx, message)
if err != nil {
return err
}
@ -174,14 +196,27 @@ func (p *BridgeEventHandler) HandleErcToNativeUserRequestForSignature(ctx contex
msg = append(msg, p.cfg.Foreign.Address[:]...)
msgHash := crypto.Keccak256Hash(msg)
sender := recipient
tx, err := p.homeClient.TransactionByHash(ctx, log.TransactionHash)
if err != nil {
return err
}
if tx.Value().Cmp(value) == 0 {
sender, err = p.homeClient.TransactionSender(tx)
if err != nil {
return fmt.Errorf("failed to extract transaction sender: %w", err)
}
}
message := &entity.ErcToNativeMessage{
BridgeID: p.bridgeID,
Direction: entity.DirectionHomeToForeign,
MsgHash: msgHash,
Sender: sender,
Receiver: recipient,
Value: value.String(),
}
err := p.repo.ErcToNativeMessages.Ensure(ctx, message)
err = p.repo.ErcToNativeMessages.Ensure(ctx, message)
if err != nil {
return err
}

View File

@ -288,8 +288,6 @@ func (m *ContractMonitor) StartBlockFetcher(ctx context.Context, start uint) {
func (m *ContractMonitor) RefetchEvents(lastFetchedBlock uint) {
m.logger.Info("refetching old events")
for _, job := range m.cfg.RefetchEvents {
topic := crypto.Keccak256Hash([]byte(job.Event))
fromBlock := job.StartBlock
if fromBlock < m.cfg.StartBlock {
fromBlock = m.cfg.StartBlock
@ -308,11 +306,15 @@ func (m *ContractMonitor) RefetchEvents(lastFetchedBlock uint) {
"from_block": fromBlock,
"to_block": end,
}).Info("scheduling new block range logs search")
m.blocksRangeChan <- &BlocksRange{
From: fromBlock,
To: end,
Topic: &topic,
br := &BlocksRange{
From: fromBlock,
To: end,
}
if job.Event != "" {
topic := crypto.Keccak256Hash([]byte(job.Event))
br.Topic = &topic
}
m.blocksRangeChan <- br
fromBlock = end + 1
}
}

View File

@ -20,9 +20,9 @@ func NewErcToNativeMessagesRepo(table string, db *db.DB) entity.ErcToNativeMessa
func (r *ercToNativeMessagesRepo) Ensure(ctx context.Context, msg *entity.ErcToNativeMessage) error {
q, args, err := sq.Insert(r.table).
Columns("bridge_id", "msg_hash", "direction", "receiver", "value").
Values(msg.BridgeID, msg.MsgHash, msg.Direction, msg.Receiver, msg.Value).
Suffix("ON CONFLICT (bridge_id, msg_hash) DO UPDATE SET updated_at = NOW()").
Columns("bridge_id", "msg_hash", "direction", "sender", "receiver", "value").
Values(msg.BridgeID, msg.MsgHash, msg.Direction, msg.Sender, msg.Receiver, msg.Value).
Suffix("ON CONFLICT (bridge_id, msg_hash) DO UPDATE SET sender = EXCLUDED.sender, updated_at = NOW()").
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {