Presenter endpoint for getting messages without sufficient signatures

This commit is contained in:
Kirill Fedoseev 2022-06-22 19:57:31 +03:00
parent fc6baff634
commit 7d51bb6957
18 changed files with 390 additions and 58 deletions

View File

@ -3,6 +3,7 @@ package contract
import ( import (
"context" "context"
"fmt" "fmt"
"math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -34,3 +35,11 @@ func (c *BridgeContract) ValidatorContractAddress(ctx context.Context) (common.A
} }
return common.BytesToAddress(res), nil return common.BytesToAddress(res), nil
} }
func (c *BridgeContract) RequiredSignatures(ctx context.Context) (uint, error) {
res, err := c.Call(ctx, "requiredSignatures")
if err != nil {
return 0, fmt.Errorf("cannot obtain required signatures: %w", err)
}
return uint(new(big.Int).SetBytes(res).Uint64()), nil
}

View File

@ -13,6 +13,20 @@
"stateMutability": "view", "stateMutability": "view",
"type": "function" "type": "function"
}, },
{
"constant": true,
"inputs": [],
"name": "requiredSignatures",
"outputs": [
{
"name": "",
"type": "uint256"
}
],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{ {
"anonymous": false, "anonymous": false,
"inputs": [ "inputs": [

29
entity/bridge_message.go Normal file
View File

@ -0,0 +1,29 @@
package entity
import "github.com/ethereum/go-ethereum/common"
type BridgeMessage interface {
GetMsgHash() common.Hash
GetMessageID() common.Hash
GetDirection() Direction
GetRawMessage() []byte
}
func ToBridgeMessages(v interface{}) []BridgeMessage {
switch msgs := v.(type) {
case []*Message:
res := make([]BridgeMessage, len(msgs))
for i, msg := range msgs {
res[i] = msg
}
return res
case []*ErcToNativeMessage:
res := make([]BridgeMessage, len(msgs))
for i, msg := range msgs {
res[i] = msg
}
return res
default:
return nil
}
}

View File

@ -20,6 +20,22 @@ type ErcToNativeMessage struct {
UpdatedAt *time.Time `db:"updated_at"` UpdatedAt *time.Time `db:"updated_at"`
} }
func (m *ErcToNativeMessage) GetMsgHash() common.Hash {
return m.MsgHash
}
func (m *ErcToNativeMessage) GetMessageID() common.Hash {
return m.MsgHash
}
func (m *ErcToNativeMessage) GetDirection() Direction {
return m.Direction
}
func (m *ErcToNativeMessage) GetRawMessage() []byte {
return m.RawMessage
}
type ErcToNativeMessagesRepo interface { type ErcToNativeMessagesRepo interface {
Ensure(ctx context.Context, msg *ErcToNativeMessage) error Ensure(ctx context.Context, msg *ErcToNativeMessage) error
GetByMsgHash(ctx context.Context, bridgeID string, msgHash common.Hash) (*ErcToNativeMessage, error) GetByMsgHash(ctx context.Context, bridgeID string, msgHash common.Hash) (*ErcToNativeMessage, error)

View File

@ -41,6 +41,7 @@ type LogsRepo interface {
Ensure(ctx context.Context, logs ...*Log) error Ensure(ctx context.Context, logs ...*Log) error
GetByID(ctx context.Context, id uint) (*Log, error) GetByID(ctx context.Context, id uint) (*Log, error)
Find(ctx context.Context, filter LogsFilter) ([]*Log, error) Find(ctx context.Context, filter LogsFilter) ([]*Log, error)
FindByIDs(ctx context.Context, ids []uint) ([]*Log, error)
} }
func NewLog(chainID string, log types.Log) *Log { func NewLog(chainID string, log types.Log) *Log {

View File

@ -30,6 +30,22 @@ type Message struct {
UpdatedAt *time.Time `db:"updated_at"` UpdatedAt *time.Time `db:"updated_at"`
} }
func (m *Message) GetMsgHash() common.Hash {
return m.MsgHash
}
func (m *Message) GetMessageID() common.Hash {
return m.MessageID
}
func (m *Message) GetDirection() Direction {
return m.Direction
}
func (m *Message) GetRawMessage() []byte {
return m.RawMessage
}
type MessagesRepo interface { type MessagesRepo interface {
Ensure(ctx context.Context, msg *Message) error Ensure(ctx context.Context, msg *Message) error
GetByMsgHash(ctx context.Context, bridgeID string, msgHash common.Hash) (*Message, error) GetByMsgHash(ctx context.Context, bridgeID string, msgHash common.Hash) (*Message, error)

View File

@ -19,4 +19,5 @@ type SentMessagesRepo interface {
Ensure(ctx context.Context, msg *SentMessage) error Ensure(ctx context.Context, msg *SentMessage) error
GetByLogID(ctx context.Context, logID uint) (*SentMessage, error) GetByLogID(ctx context.Context, logID uint) (*SentMessage, error)
GetByMsgHash(ctx context.Context, bridgeID string, msgHash common.Hash) (*SentMessage, error) GetByMsgHash(ctx context.Context, bridgeID string, msgHash common.Hash) (*SentMessage, error)
FindByMsgHashes(ctx context.Context, bridgeID string, msgHashes []common.Hash) ([]*SentMessage, error)
} }

View File

@ -19,6 +19,6 @@ type SignedMessage struct {
type SignedMessagesRepo interface { type SignedMessagesRepo interface {
Ensure(ctx context.Context, msg *SignedMessage) error Ensure(ctx context.Context, msg *SignedMessage) error
GetByLogID(ctx context.Context, logID uint) (*SignedMessage, error) GetByLogID(ctx context.Context, logID uint) (*SignedMessage, error)
FindByMsgHash(ctx context.Context, bridgeID string, msgHash []common.Hash) ([]*SignedMessage, error) FindByMsgHashes(ctx context.Context, bridgeID string, msgHashes []common.Hash) ([]*SignedMessage, error)
GetLatest(ctx context.Context, bridgeID, chainID string, signer common.Address) (*SignedMessage, error) GetLatest(ctx context.Context, bridgeID, chainID string, signer common.Address) (*SignedMessage, error)
} }

View File

@ -22,6 +22,7 @@ var (
) )
type Client interface { type Client interface {
Close()
BlockNumber(ctx context.Context) (uint, error) BlockNumber(ctx context.Context) (uint, error)
HeaderByNumber(ctx context.Context, n uint) (*types.Header, error) HeaderByNumber(ctx context.Context, n uint) (*types.Header, error)
FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error)
@ -69,6 +70,10 @@ func NewClient(url string, timeout time.Duration, chainID string) (Client, error
return client, nil return client, nil
} }
func (c *rpcClient) Close() {
c.rawClient.Close()
}
func (c *rpcClient) BlockNumber(ctx context.Context) (uint, error) { func (c *rpcClient) BlockNumber(ctx context.Context) (uint, error) {
defer ObserveDuration(c.chainID, c.url, "eth_blockNumber")() defer ObserveDuration(c.chainID, c.url, "eth_blockNumber")()
ctx, cancel := context.WithTimeout(ctx, c.timeout) ctx, cancel := context.WithTimeout(ctx, c.timeout)

View File

@ -2,22 +2,27 @@ package presenter
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
chimiddleware "github.com/go-chi/chi/v5/middleware" chimiddleware "github.com/go-chi/chi/v5/middleware"
"github.com/poanetwork/tokenbridge-monitor/config" "github.com/poanetwork/tokenbridge-monitor/config"
"github.com/poanetwork/tokenbridge-monitor/contract"
"github.com/poanetwork/tokenbridge-monitor/db" "github.com/poanetwork/tokenbridge-monitor/db"
"github.com/poanetwork/tokenbridge-monitor/entity" "github.com/poanetwork/tokenbridge-monitor/entity"
"github.com/poanetwork/tokenbridge-monitor/ethclient"
"github.com/poanetwork/tokenbridge-monitor/logging" "github.com/poanetwork/tokenbridge-monitor/logging"
"github.com/poanetwork/tokenbridge-monitor/presenter/http/middleware" "github.com/poanetwork/tokenbridge-monitor/presenter/http/middleware"
"github.com/poanetwork/tokenbridge-monitor/presenter/http/render" "github.com/poanetwork/tokenbridge-monitor/presenter/http/render"
"github.com/poanetwork/tokenbridge-monitor/repository" "github.com/poanetwork/tokenbridge-monitor/repository"
"github.com/poanetwork/tokenbridge-monitor/utils"
) )
var ( var (
@ -61,6 +66,7 @@ func (p *Presenter) Serve(addr string) error {
r.Get("/config", p.GetBridgeConfig) r.Get("/config", p.GetBridgeConfig)
r.Get("/validators", p.GetBridgeValidators) r.Get("/validators", p.GetBridgeValidators)
r.Get("/pending", p.GetPendingMessages) r.Get("/pending", p.GetPendingMessages)
r.Post("/unsigned", p.GetMessagesWithMissingSignatures)
}) })
p.root.Route("/chain/{chainID:[0-9]+}", func(r chi.Router) { p.root.Route("/chain/{chainID:[0-9]+}", func(r chi.Router) {
r.Use(middleware.GetChainConfigMiddleware(p.cfg)) r.Use(middleware.GetChainConfigMiddleware(p.cfg))
@ -88,6 +94,18 @@ func (p *Presenter) Serve(addr string) error {
return http.ListenAndServe(addr, p.root) return http.ListenAndServe(addr, p.root)
} }
func (p *Presenter) findActiveValidatorAddresses(ctx context.Context, bridgeID, chainID string) ([]common.Address, error) {
validators, err := p.repo.BridgeValidators.FindActiveValidators(ctx, bridgeID, chainID)
if err != nil {
return nil, fmt.Errorf("failed to find validators for bridge id: %w", err)
}
validatorAddresses := make([]common.Address, len(validators))
for i, v := range validators {
validatorAddresses[i] = v.Address
}
return validatorAddresses, nil
}
func (p *Presenter) getBridgeSideInfo(ctx context.Context, bridgeID string, cfg *config.BridgeSideConfig) (*BridgeSideInfo, error) { func (p *Presenter) getBridgeSideInfo(ctx context.Context, bridgeID string, cfg *config.BridgeSideConfig) (*BridgeSideInfo, error) {
cursor, err := p.repo.LogsCursors.GetByChainIDAndAddress(ctx, cfg.Chain.ChainID, cfg.Address) cursor, err := p.repo.LogsCursors.GetByChainIDAndAddress(ctx, cfg.Chain.ChainID, cfg.Address)
if err != nil { if err != nil {
@ -107,13 +125,9 @@ func (p *Presenter) getBridgeSideInfo(ctx context.Context, bridgeID string, cfg
} }
} }
validators, err := p.repo.BridgeValidators.FindActiveValidators(ctx, bridgeID, cfg.Chain.ChainID) validators, err := p.findActiveValidatorAddresses(ctx, bridgeID, cfg.Chain.ChainID)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to find validators for bridge id: %w", err) return nil, err
}
validatorAddresses := make([]common.Address, len(validators))
for i, v := range validators {
validatorAddresses[i] = v.Address
} }
return &BridgeSideInfo{ return &BridgeSideInfo{
@ -124,7 +138,7 @@ func (p *Presenter) getBridgeSideInfo(ctx context.Context, bridgeID string, cfg
LastFetchBlockTime: lastFetchedBlockTime, LastFetchBlockTime: lastFetchedBlockTime,
LastProcessedBlock: cursor.LastProcessedBlock, LastProcessedBlock: cursor.LastProcessedBlock,
LastProcessedBlockTime: lastProcessedBlockTime, LastProcessedBlockTime: lastProcessedBlockTime,
Validators: validatorAddresses, Validators: validators,
}, nil }, nil
} }
@ -169,13 +183,13 @@ func (p *Presenter) GetBridgeValidators(w http.ResponseWriter, r *http.Request)
ctx := r.Context() ctx := r.Context()
cfg := middleware.BridgeConfig(ctx) cfg := middleware.BridgeConfig(ctx)
homeValidators, err := p.repo.BridgeValidators.FindActiveValidators(ctx, cfg.ID, cfg.Home.Chain.ChainID) homeValidators, err := p.findActiveValidatorAddresses(ctx, cfg.ID, cfg.Home.Chain.ChainID)
if err != nil { if err != nil {
render.Error(w, r, fmt.Errorf("failed to find home validators: %w", err)) render.Error(w, r, fmt.Errorf("failed to find home validators: %w", err))
return return
} }
foreignValidators, err := p.repo.BridgeValidators.FindActiveValidators(ctx, cfg.ID, cfg.Home.Chain.ChainID) foreignValidators, err := p.findActiveValidatorAddresses(ctx, cfg.ID, cfg.Home.Chain.ChainID)
if err != nil { if err != nil {
render.Error(w, r, fmt.Errorf("failed to find home validators: %w", err)) render.Error(w, r, fmt.Errorf("failed to find home validators: %w", err))
return return
@ -190,15 +204,15 @@ func (p *Presenter) GetBridgeValidators(w http.ResponseWriter, r *http.Request)
seenValidators := make(map[common.Address]bool, len(validators)) seenValidators := make(map[common.Address]bool, len(validators))
for _, val := range validators { for _, val := range validators {
if seenValidators[val.Address] { if seenValidators[val] {
continue continue
} }
seenValidators[val.Address] = true seenValidators[val] = true
valInfo := &ValidatorInfo{ valInfo := &ValidatorInfo{
Address: val.Address, Address: val,
} }
confirmation, err2 := p.repo.SignedMessages.GetLatest(ctx, cfg.ID, cfg.Home.Chain.ChainID, val.Address) confirmation, err2 := p.repo.SignedMessages.GetLatest(ctx, cfg.ID, cfg.Home.Chain.ChainID, val)
if err2 != nil { if err2 != nil {
if !errors.Is(err2, db.ErrNotFound) { if !errors.Is(err2, db.ErrNotFound) {
render.Error(w, r, fmt.Errorf("failed to find latest validator confirmation: %w", err2)) render.Error(w, r, fmt.Errorf("failed to find latest validator confirmation: %w", err2))
@ -221,33 +235,169 @@ func (p *Presenter) GetPendingMessages(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
cfg := middleware.BridgeConfig(ctx) cfg := middleware.BridgeConfig(ctx)
if cfg.BridgeMode == config.BridgeModeErcToNative { msgs, err := p.repo.FindPendingMessages(ctx, cfg.ID, cfg.BridgeMode)
messages, err := p.repo.ErcToNativeMessages.FindPendingMessages(ctx, cfg.ID) if err != nil {
if err != nil { render.Error(w, r, fmt.Errorf("can't find pending messages: %w", err))
render.Error(w, r, fmt.Errorf("can't find pending messages: %w", err)) return
return
}
res := make([]*ErcToNativeMessageInfo, len(messages))
for i, m := range messages {
res[i] = NewErcToNativeMessageInfo(m)
}
render.JSON(w, r, http.StatusOK, res)
} else {
messages, err := p.repo.Messages.FindPendingMessages(ctx, cfg.ID)
if err != nil {
render.Error(w, r, fmt.Errorf("can't find pending messages: %w", err))
return
}
res := make([]*MessageInfo, len(messages))
for i, m := range messages {
res[i] = NewMessageInfo(m)
}
render.JSON(w, r, http.StatusOK, res)
} }
res := make([]interface{}, len(msgs))
for i, m := range msgs {
res[i] = NewBridgeMessageInfo(m)
}
render.JSON(w, r, http.StatusOK, res)
}
//nolint:funlen,cyclop
func (p *Presenter) GetMessagesWithMissingSignatures(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
cfg := middleware.BridgeConfig(ctx)
foreignClient, err := ethclient.NewClient(cfg.Foreign.Chain.RPC.Host, cfg.Foreign.Chain.RPC.Timeout, cfg.Foreign.Chain.ChainID)
if err != nil {
render.Error(w, r, fmt.Errorf("can't connect to foreign chain: %w", err))
return
}
defer foreignClient.Close()
bridgeContract := contract.NewBridgeContract(foreignClient, cfg.Foreign.Address, cfg.BridgeMode)
requiredSignatures, err := bridgeContract.RequiredSignatures(ctx)
if err != nil {
render.Error(w, r, fmt.Errorf("can't get required signatures: %w", err))
return
}
validators, err := p.findActiveValidatorAddresses(ctx, cfg.ID, cfg.Foreign.Chain.ChainID)
if err != nil {
render.Error(w, r, fmt.Errorf("can't get active validators: %w", err))
return
}
msgs, err := p.repo.FindPendingMessages(ctx, cfg.ID, cfg.BridgeMode)
if err != nil {
render.Error(w, r, fmt.Errorf("can't find pending messages: %w", err))
return
}
msgHashes := make([]common.Hash, len(msgs))
messages := make(map[common.Hash]entity.BridgeMessage, len(msgs))
rawMessages := make(map[common.Hash][]byte, len(msgs))
sentTxLinks := make(map[common.Hash]string, len(msgs))
for i, msg := range msgs {
if msg.GetDirection() == entity.DirectionHomeToForeign {
msgHashes[i] = msg.GetMsgHash()
messages[msg.GetMsgHash()] = msg
rawMessages[msg.GetMsgHash()] = msg.GetRawMessage()
}
}
signatures, err := p.repo.SignedMessages.FindByMsgHashes(ctx, cfg.ID, msgHashes)
if err != nil {
render.Error(w, r, fmt.Errorf("can't find signed signatures: %w", err))
return
}
sentMsgs, err := p.repo.SentMessages.FindByMsgHashes(ctx, cfg.ID, msgHashes)
if err != nil {
render.Error(w, r, fmt.Errorf("can't find sent messages: %w", err))
return
}
logIDs := make([]uint, len(sentMsgs))
sentMsgHashMap := make(map[uint]common.Hash, len(sentMsgs))
for i, sentMsg := range sentMsgs {
logIDs[i] = sentMsg.LogID
sentMsgHashMap[sentMsg.LogID] = sentMsg.MsgHash
}
logs, err := p.repo.Logs.FindByIDs(ctx, logIDs)
if err != nil {
render.Error(w, r, fmt.Errorf("can't find logs: %w", err))
return
}
for _, log := range logs {
sentTxLinks[sentMsgHashMap[log.ID]] = p.cfg.GetChainConfig(log.ChainID).FormatTxLink(log.TransactionHash)
}
r.Body = http.MaxBytesReader(w, r.Body, 5<<20)
manualSigners, err := p.parseManualSignatures(r, rawMessages)
if err != nil {
render.Error(w, r, fmt.Errorf("can't parse manual signatures: %w", err))
return
}
signersMap := p.makeSignersMap(msgHashes, signatures, manualSigners)
res := make([]*UnsignedMessageInfo, 0, len(messages))
for hash, msg := range messages {
var signers, missingSigners []common.Address
for _, signer := range validators {
if signersMap[hash][signer] {
signers = append(signers, signer)
} else {
missingSigners = append(missingSigners, signer)
}
}
if uint(len(signers)) < requiredSignatures {
res = append(res, &UnsignedMessageInfo{
Message: NewBridgeMessageInfo(msg),
Link: sentTxLinks[hash],
Signers: signers,
MissingSigners: missingSigners,
})
}
}
render.JSON(w, r, http.StatusOK, UnsignedMessagesInfo{
RequiredSignatures: requiredSignatures,
ActiveValidators: validators,
TotalPendingMessages: uint(len(msgHashes)),
TotalUnsignedMessages: uint(len(res)),
UnsignedMessages: res,
})
}
func (p *Presenter) makeSignersMap(msgHashes []common.Hash, signatures []*entity.SignedMessage, manualSigners map[common.Hash][]common.Address) map[common.Hash]map[common.Address]bool {
signersMap := make(map[common.Hash]map[common.Address]bool, len(msgHashes))
for _, hash := range msgHashes {
signersMap[hash] = make(map[common.Address]bool, 10)
}
for _, sig := range signatures {
signersMap[sig.MsgHash][sig.Signer] = true
}
for hash, signers := range manualSigners {
if signersMap[hash] == nil {
continue
}
for _, signer := range signers {
signersMap[hash][signer] = true
}
}
return signersMap
}
func (p *Presenter) parseManualSignatures(r *http.Request, messages map[common.Hash][]byte) (map[common.Hash][]common.Address, error) {
err := r.ParseMultipartForm(32 << 20)
if err != nil {
return nil, fmt.Errorf("can't parse multipart form: %w", err)
}
res := make(map[common.Hash][]common.Address, 100)
for _, hdr := range r.MultipartForm.File["signatures"] {
file, err2 := hdr.Open()
if err2 != nil {
return nil, fmt.Errorf("can't open file: %w", err2)
}
signatures := make(map[common.Hash]hexutil.Bytes)
err = json.NewDecoder(file).Decode(&signatures)
if err != nil {
return nil, fmt.Errorf("can't decode json file: %w", err)
}
for hash, sig := range signatures {
signer, err3 := utils.RestoreSignerAddress(messages[hash], sig)
if err3 != nil {
return nil, err3
}
res[hash] = append(res[hash], signer)
}
}
return res, nil
} }
func (p *Presenter) getFilteredLogs(ctx context.Context) ([]*entity.Log, error) { func (p *Presenter) getFilteredLogs(ctx context.Context) ([]*entity.Log, error) {
@ -357,7 +507,7 @@ func (p *Presenter) buildSearchResultForMessage(ctx context.Context, bridgeID st
if msgHash == nil && messageID == nil { if msgHash == nil && messageID == nil {
return nil, ErrMissingMsgHashAndMessageID return nil, ErrMissingMsgHashAndMessageID
} }
var msg *entity.Message var msg entity.BridgeMessage
var err error var err error
var searchID common.Hash var searchID common.Hash
if msgHash != nil { if msgHash != nil {
@ -368,28 +518,17 @@ func (p *Presenter) buildSearchResultForMessage(ctx context.Context, bridgeID st
msg, err = p.repo.Messages.GetByMessageID(ctx, bridgeID, *messageID) msg, err = p.repo.Messages.GetByMessageID(ctx, bridgeID, *messageID)
} }
if errors.Is(err, db.ErrNotFound) { if errors.Is(err, db.ErrNotFound) {
ercToNativeMsg, err2 := p.repo.ErcToNativeMessages.GetByMsgHash(ctx, bridgeID, searchID) msg, err = p.repo.ErcToNativeMessages.GetByMsgHash(ctx, bridgeID, searchID)
if err2 != nil {
return nil, err2
}
events, err2 := p.buildMessageEvents(ctx, ercToNativeMsg.BridgeID, ercToNativeMsg.MsgHash, ercToNativeMsg.MsgHash)
if err2 != nil {
return nil, err2
}
return &SearchResult{
Message: NewErcToNativeMessageInfo(ercToNativeMsg),
RelatedEvents: events,
}, nil
} }
if err != nil { if err != nil {
return nil, err return nil, err
} }
events, err := p.buildMessageEvents(ctx, msg.BridgeID, msg.MsgHash, msg.MessageID) events, err := p.buildMessageEvents(ctx, bridgeID, msg.GetMsgHash(), msg.GetMessageID())
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &SearchResult{ return &SearchResult{
Message: NewMessageInfo(msg), Message: NewBridgeMessageInfo(msg),
RelatedEvents: events, RelatedEvents: events,
}, nil }, nil
} }
@ -441,7 +580,7 @@ func (p *Presenter) buildMessageEvents(ctx context.Context, bridgeID string, msg
if err = db.IgnoreErrNotFound(err); err != nil { if err = db.IgnoreErrNotFound(err); err != nil {
return nil, err return nil, err
} }
signed, err := p.repo.SignedMessages.FindByMsgHash(ctx, bridgeID, msgHash) signed, err := p.repo.SignedMessages.FindByMsgHashes(ctx, bridgeID, []common.Hash{msgHash})
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -106,6 +106,21 @@ type TxInfo struct {
Link string Link string
} }
type UnsignedMessagesInfo struct {
RequiredSignatures uint
ActiveValidators []common.Address
TotalPendingMessages uint
TotalUnsignedMessages uint
UnsignedMessages []*UnsignedMessageInfo
}
type UnsignedMessageInfo struct {
Message interface{}
Link string
Signers []common.Address
MissingSigners []common.Address
}
func NewLogInfo(log *entity.Log) *LogInfo { func NewLogInfo(log *entity.Log) *LogInfo {
return &LogInfo{ return &LogInfo{
LogID: log.ID, LogID: log.ID,
@ -163,3 +178,14 @@ func NewErcToNativeMessageInfo(req *entity.ErcToNativeMessage) *ErcToNativeMessa
Value: req.Value, Value: req.Value,
} }
} }
func NewBridgeMessageInfo(req entity.BridgeMessage) interface{} {
switch msg := req.(type) {
case *entity.Message:
return NewMessageInfo(msg)
case *entity.ErcToNativeMessage:
return NewErcToNativeMessageInfo(msg)
default:
return nil
}
}

View File

@ -56,6 +56,7 @@ func (r *ercToNativeMessagesRepo) FindPendingMessages(ctx context.Context, bridg
From(r.table + " m"). From(r.table + " m").
LeftJoin("executed_messages em ON em.message_id = m.msg_hash AND em.bridge_id = m.bridge_id"). LeftJoin("executed_messages em ON em.message_id = m.msg_hash AND em.bridge_id = m.bridge_id").
Where(sq.Eq{"m.bridge_id": bridgeID, "em.log_id": nil}). Where(sq.Eq{"m.bridge_id": bridgeID, "em.log_id": nil}).
OrderBy("m.created_at").
PlaceholderFormat(sq.Dollar). PlaceholderFormat(sq.Dollar).
ToSql() ToSql()
if err != nil { if err != nil {

View File

@ -114,3 +114,21 @@ func (r *logsRepo) Find(ctx context.Context, filter entity.LogsFilter) ([]*entit
} }
return logs, nil return logs, nil
} }
func (r *logsRepo) FindByIDs(ctx context.Context, ids []uint) ([]*entity.Log, error) {
q, args, err := sq.Select("*").
From(r.table).
Where(sq.Eq{"id": ids}).
OrderBy("chain_id", "block_number", "log_index").
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
return nil, fmt.Errorf("can't build query: %w", err)
}
logs := make([]*entity.Log, 0, 10)
err = r.db.SelectContext(ctx, &logs, q, args...)
if err != nil {
return nil, fmt.Errorf("can't find logs by ids: %w", err)
}
return logs, nil
}

View File

@ -73,6 +73,7 @@ func (r *messagesRepo) FindPendingMessages(ctx context.Context, bridgeID string)
From(r.table + " m"). From(r.table + " m").
LeftJoin("executed_messages em ON em.message_id = m.message_id AND em.bridge_id = m.bridge_id"). LeftJoin("executed_messages em ON em.message_id = m.message_id AND em.bridge_id = m.bridge_id").
Where(sq.Eq{"m.bridge_id": bridgeID, "em.log_id": nil}). Where(sq.Eq{"m.bridge_id": bridgeID, "em.log_id": nil}).
OrderBy("m.created_at").
PlaceholderFormat(sq.Dollar). PlaceholderFormat(sq.Dollar).
ToSql() ToSql()
if err != nil { if err != nil {

View File

@ -67,3 +67,20 @@ func (r *sentMessagesRepo) GetByMsgHash(ctx context.Context, bridgeID string, ms
} }
return msg, nil return msg, nil
} }
func (r *sentMessagesRepo) FindByMsgHashes(ctx context.Context, bridgeID string, msgHashes []common.Hash) ([]*entity.SentMessage, error) {
q, args, err := sq.Select("*").
From(r.table).
Where(sq.Eq{"bridge_id": bridgeID, "msg_hash": msgHashes}).
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
return nil, fmt.Errorf("can't build query: %w", err)
}
msgs := make([]*entity.SentMessage, 0, 10)
err = r.db.SelectContext(ctx, &msgs, q, args...)
if err != nil {
return nil, fmt.Errorf("can't find sent messages: %w", err)
}
return msgs, nil
}

View File

@ -51,10 +51,10 @@ func (r *signedMessagesRepo) GetByLogID(ctx context.Context, logID uint) (*entit
return msg, nil return msg, nil
} }
func (r *signedMessagesRepo) FindByMsgHash(ctx context.Context, bridgeID string, msgHash []common.Hash) ([]*entity.SignedMessage, error) { func (r *signedMessagesRepo) FindByMsgHashes(ctx context.Context, bridgeID string, msgHashes []common.Hash) ([]*entity.SignedMessage, error) {
q, args, err := sq.Select("*"). q, args, err := sq.Select("*").
From(r.table). From(r.table).
Where(sq.Eq{"bridge_id": bridgeID, "msg_hash": msgHash}). Where(sq.Eq{"bridge_id": bridgeID, "msg_hash": msgHashes}).
OrderBy("signer"). OrderBy("signer").
PlaceholderFormat(sq.Dollar). PlaceholderFormat(sq.Dollar).
ToSql() ToSql()

View File

@ -1,6 +1,10 @@
package repository package repository
import ( import (
"context"
"fmt"
"github.com/poanetwork/tokenbridge-monitor/config"
"github.com/poanetwork/tokenbridge-monitor/db" "github.com/poanetwork/tokenbridge-monitor/db"
"github.com/poanetwork/tokenbridge-monitor/entity" "github.com/poanetwork/tokenbridge-monitor/entity"
"github.com/poanetwork/tokenbridge-monitor/repository/postgres" "github.com/poanetwork/tokenbridge-monitor/repository/postgres"
@ -41,3 +45,18 @@ func NewRepo(db *db.DB) *Repo {
BridgeValidators: postgres.NewBridgeValidatorsRepo("bridge_validators", db), BridgeValidators: postgres.NewBridgeValidatorsRepo("bridge_validators", db),
} }
} }
func (r *Repo) FindPendingMessages(ctx context.Context, bridgeID string, bridgeMode config.BridgeMode) ([]entity.BridgeMessage, error) {
if bridgeMode == config.BridgeModeErcToNative {
msgs, err := r.ErcToNativeMessages.FindPendingMessages(ctx, bridgeID)
if err != nil {
return nil, fmt.Errorf("can't find pending erc-to-native messages: %w", err)
}
return entity.ToBridgeMessages(msgs), nil
}
msgs, err := r.Messages.FindPendingMessages(ctx, bridgeID)
if err != nil {
return nil, fmt.Errorf("can't find pending amb messages: %w", err)
}
return entity.ToBridgeMessages(msgs), nil
}

20
utils/ecdsa.go Normal file
View File

@ -0,0 +1,20 @@
package utils
import (
"fmt"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
)
func RestoreSignerAddress(data, sig []byte) (common.Address, error) {
if len(sig) >= 65 && sig[64] >= 27 {
sig[64] -= 27
}
pk, err := crypto.SigToPub(accounts.TextHash(data), sig)
if err != nil {
return common.Address{}, fmt.Errorf("can't recover ecdsa signer: %w", err)
}
return crypto.PubkeyToAddress(*pk), nil
}