diff --git a/cmd/monitor/main.go b/cmd/monitor/main.go index f851d0c..5f3ad7e 100644 --- a/cmd/monitor/main.go +++ b/cmd/monitor/main.go @@ -46,7 +46,7 @@ func main() { repo := repository.NewRepo(dbConn) if cfg.Presenter != nil { - pr := presenter.NewPresenter(logger.WithField("service", "presenter"), repo, cfg.Bridges) + pr := presenter.NewPresenter(logger.WithField("service", "presenter"), repo, cfg) go func() { err := pr.Serve(cfg.Presenter.Host) if err != nil { diff --git a/config/config.go b/config/config.go index 1696c14..3bd693f 100644 --- a/config/config.go +++ b/config/config.go @@ -12,7 +12,7 @@ import ( ) type RPCConfig struct { - Host string `yaml:"host"` + Host string `yaml:"host" json:"-"` // hidden from public presenter endpoint Timeout time.Duration `yaml:"timeout"` RPS float64 `yaml:"rps"` } diff --git a/entity/bridge_validator.go b/entity/bridge_validator.go index 94a2ee3..7f8f498 100644 --- a/entity/bridge_validator.go +++ b/entity/bridge_validator.go @@ -20,5 +20,5 @@ type BridgeValidator struct { type BridgeValidatorsRepo interface { Ensure(ctx context.Context, val *BridgeValidator) error FindActiveValidator(ctx context.Context, bridgeID, chainID string, address common.Address) (*BridgeValidator, error) - FindActiveValidators(ctx context.Context, bridgeID string) ([]*BridgeValidator, error) + FindActiveValidators(ctx context.Context, bridgeID, chainID string) ([]*BridgeValidator, error) } diff --git a/presenter/presenter.go b/presenter/presenter.go index e49dcaa..df9b529 100644 --- a/presenter/presenter.go +++ b/presenter/presenter.go @@ -6,8 +6,8 @@ import ( "errors" "fmt" "net/http" - "regexp" "strconv" + "time" "github.com/ethereum/go-ethereum/common" "github.com/go-chi/chi/v5" @@ -20,19 +20,29 @@ import ( "github.com/poanetwork/tokenbridge-monitor/repository" ) +type ctxKey int + +const ( + BridgeCfgCtxKey ctxKey = iota + ChainCfgCtxKey + BlockNumberCtxKey + TxHashCtxKey + FilterCtxKey +) + type Presenter struct { - logger logging.Logger - repo *repository.Repo - bridges map[string]*config.BridgeConfig - root chi.Router + logger logging.Logger + repo *repository.Repo + cfg *config.Config + root chi.Router } -func NewPresenter(logger logging.Logger, repo *repository.Repo, bridges map[string]*config.BridgeConfig) *Presenter { +func NewPresenter(logger logging.Logger, repo *repository.Repo, cfg *config.Config) *Presenter { return &Presenter{ - logger: logger, - repo: repo, - bridges: bridges, - root: chi.NewMux(), + logger: logger, + repo: repo, + cfg: cfg, + root: chi.NewMux(), } } @@ -41,97 +51,239 @@ func (p *Presenter) Serve(addr string) error { p.root.Use(middleware.Throttle(5)) p.root.Use(middleware.RequestID) p.root.Use(NewRequestLogger(p.logger)) - p.root.Get("/tx/{txHash:0x[0-9a-fA-F]{64}}", p.wrapJSONHandler(p.SearchTx)) - p.root.Get("/block/{chainID:[0-9]+}/{blockNumber:[0-9]+}", p.wrapJSONHandler(p.SearchBlock)) - p.root.Get("/bridge/{bridgeID:[0-9a-zA-Z_\\-]+}/validators", p.wrapJSONHandler(p.SearchValidators)) - p.root.Get("/logs", p.wrapJSONHandler(p.SearchLogs)) + registerSearchRoutes := func(r chi.Router) { + r.Use(p.GetFilterMiddleware) + r.Get("/", p.GetMessages) + r.Get("/logs", p.GetLogs) + r.Get("/messages", p.GetMessages) + } + p.root.Route("/bridge/{bridgeID:[0-9a-zA-Z_\\-]+}", func(r chi.Router) { + r.Use(p.GetBridgeConfigMiddleware) + r.Get("/", p.GetBridgeInfo) + r.Get("/info", p.GetBridgeInfo) + r.Get("/config", p.GetBridgeConfig) + r.Get("/validators", p.GetBridgeValidators) + }) + p.root.Route("/chain/{chainID:[0-9]+}", func(r chi.Router) { + r.Use(p.GetChainConfigMiddleware) + r.Route("/block/{blockNumber:[0-9]+}", func(r2 chi.Router) { + r2.Use(p.GetBlockNumberMiddleware) + r2.Group(registerSearchRoutes) + }) + r.Route("/tx/{txHash:0x[0-9a-fA-F]{64}}", func(r2 chi.Router) { + r2.Use(p.GetTxHashMiddleware) + r2.Group(registerSearchRoutes) + }) + }) + p.root.Route("/tx/{txHash:0x[0-9a-fA-F]{64}}", func(r chi.Router) { + r.Use(p.GetTxHashMiddleware) + r.Group(registerSearchRoutes) + }) return http.ListenAndServe(addr, p.root) } -func (p *Presenter) wrapJSONHandler(handler func(r *http.Request) (interface{}, error)) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - res, err := handler(r) - if err != nil { - p.logger.WithError(err).Error("failed to handle request") - w.WriteHeader(http.StatusInternalServerError) - } +func (p *Presenter) JSON(w http.ResponseWriter, r *http.Request, status int, v interface{}) { + enc := json.NewEncoder(w) - enc := json.NewEncoder(w) + if pretty, _ := strconv.ParseBool(chi.URLParam(r, "pretty")); pretty { enc.SetIndent("", " ") - if err = enc.Encode(res); err != nil { - p.logger.WithError(err).Error("failed to marshal JSON result") - w.WriteHeader(http.StatusInternalServerError) + } + + w.WriteHeader(status) + if err := enc.Encode(v); err != nil { + p.Error(w, r, fmt.Errorf("failed to marshal JSON result: %w", err)) + return + } + + w.Header().Set("Content-Type", "application/json") +} + +func (p *Presenter) Error(w http.ResponseWriter, r *http.Request, err error) { + w.WriteHeader(http.StatusInternalServerError) + _, err = w.Write([]byte(err.Error())) + p.logger.WithError(err).Error("request handling failed") + if err != nil { + p.logger.WithError(err).Error("can't write error response") + } +} + +func (p *Presenter) GetBridgeConfigMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + bridgeID := chi.URLParam(r, "bridgeID") + + cfg, ok := p.cfg.Bridges[bridgeID] + if !ok || cfg == nil { + p.JSON(w, r, http.StatusNotFound, fmt.Sprintf("bridge with id %s not found", bridgeID)) + return + } + + ctx := context.WithValue(r.Context(), BridgeCfgCtxKey, cfg) + next.ServeHTTP(w, r.WithContext(ctx)) + }) +} + +func (p *Presenter) GetChainConfigMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + chainID := chi.URLParam(r, "chainID") + + var cfg *config.ChainConfig + for _, chainCfg := range p.cfg.Chains { + if chainCfg.ChainID == chainID { + cfg = chainCfg + break + } + } + if cfg == nil { + p.JSON(w, r, http.StatusNotFound, fmt.Sprintf("chain with id %s not found", chainID)) + return + } + + ctx := context.WithValue(r.Context(), ChainCfgCtxKey, cfg) + next.ServeHTTP(w, r.WithContext(ctx)) + }) +} + +func (p *Presenter) GetBlockNumberMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + blockNumber, err := strconv.ParseUint(chi.URLParam(r, "blockNumber"), 10, 32) + if err != nil { + p.Error(w, r, fmt.Errorf("failed to parse blockNumber: %w", err)) + return + } + + ctx := context.WithValue(r.Context(), BlockNumberCtxKey, uint(blockNumber)) + next.ServeHTTP(w, r.WithContext(ctx)) + }) +} + +func (p *Presenter) GetTxHashMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + txHash := chi.URLParam(r, "txHash") + + ctx := context.WithValue(r.Context(), TxHashCtxKey, common.HexToHash(txHash)) + next.ServeHTTP(w, r.WithContext(ctx)) + }) +} + +func (p *Presenter) GetFilterMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + filter := &FilterContext{} + + if cfg, ok := ctx.Value(ChainCfgCtxKey).(*config.ChainConfig); ok { + filter.ChainID = &cfg.ChainID + } + if blockNumber, ok := ctx.Value(BlockNumberCtxKey).(uint); ok { + filter.FromBlock = &blockNumber + filter.ToBlock = &blockNumber + } + if txHash, ok := ctx.Value(TxHashCtxKey).(common.Hash); ok { + filter.TxHash = &txHash + } + + ctx = context.WithValue(ctx, FilterCtxKey, filter) + next.ServeHTTP(w, r.WithContext(ctx)) + }) +} + +func (p *Presenter) getBridgeSideInfo(ctx context.Context, bridgeID string, cfg *config.BridgeSideConfig) (*BridgeSideInfo, error) { + cursor, err := p.repo.LogsCursors.GetByChainIDAndAddress(ctx, cfg.Chain.ChainID, cfg.Address) + if err != nil { + return nil, fmt.Errorf("failed to get home bridge cursor: %w", err) + } + + var lastFetchedBlockTime, lastProcessedBlockTime time.Time + bt, err := p.repo.BlockTimestamps.GetByBlockNumber(ctx, cfg.Chain.ChainID, cursor.LastFetchedBlock) + if err != nil && !errors.Is(err, db.ErrNotFound) { + return nil, fmt.Errorf("failed to get home bridge cursor: %w", err) + } else if err == nil { + lastFetchedBlockTime = bt.Timestamp + } + + if cursor.LastFetchedBlock == cursor.LastProcessedBlock { + lastProcessedBlockTime = lastFetchedBlockTime + } else { + bt, err = p.repo.BlockTimestamps.GetByBlockNumber(ctx, cfg.Chain.ChainID, cursor.LastProcessedBlock) + if err != nil && !errors.Is(err, db.ErrNotFound) { + return nil, fmt.Errorf("failed to get home bridge cursor: %w", err) + } else if err == nil { + lastProcessedBlockTime = bt.Timestamp } } + + validators, err := p.repo.BridgeValidators.FindActiveValidators(ctx, bridgeID, cfg.Chain.ChainID) + if err != nil { + return nil, fmt.Errorf("failed to find validators for bridge id: %w", err) + } + validatorAddresses := make([]common.Address, len(validators)) + for i, v := range validators { + validatorAddresses[i] = v.Address + } + + return &BridgeSideInfo{ + Chain: cfg.ChainName, + ChainID: cfg.Chain.ChainID, + BridgeAddress: cfg.Address, + LastFetchedBlock: cursor.LastFetchedBlock, + LastFetchBlockTime: lastFetchedBlockTime, + LastProcessedBlock: cursor.LastProcessedBlock, + LastProcessedBlockTime: lastProcessedBlockTime, + Validators: validatorAddresses, + }, nil } -func (p *Presenter) SearchTx(r *http.Request) (interface{}, error) { +func (p *Presenter) GetBridgeConfig(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - txHash := common.HexToHash(chi.URLParam(r, "txHash")) + cfg, _ := ctx.Value(BridgeCfgCtxKey).(*config.BridgeConfig) - logs, err := p.repo.Logs.FindByTxHash(ctx, txHash) - if err != nil { - p.logger.WithError(err).Error("failed to find logs by tx hash") - return nil, err - } - return p.searchInLogs(ctx, logs), nil + p.JSON(w, r, http.StatusOK, cfg) } -func (p *Presenter) SearchBlock(r *http.Request) (interface{}, error) { +func (p *Presenter) GetBridgeInfo(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - chainID := chi.URLParam(r, "chainID") - blockNumber, err := strconv.ParseUint(chi.URLParam(r, "blockNumber"), 10, 32) + cfg, _ := ctx.Value(BridgeCfgCtxKey).(*config.BridgeConfig) + + homeInfo, err := p.getBridgeSideInfo(ctx, cfg.ID, cfg.Home) if err != nil { - p.logger.WithError(err).Error("failed to parse blockNumber") - return nil, err + p.Error(w, r, fmt.Errorf("failed to get home bridge info: %w", err)) + return + } + foreignInfo, err := p.getBridgeSideInfo(ctx, cfg.ID, cfg.Foreign) + if err != nil { + p.Error(w, r, fmt.Errorf("failed to get foreign bridge info: %w", err)) + return } - logs, err := p.repo.Logs.FindByBlockNumber(ctx, chainID, uint(blockNumber)) - if err != nil { - p.logger.WithError(err).Error("failed to find logs by block number") - return nil, err - } - - return p.searchInLogs(ctx, logs), nil + p.JSON(w, r, http.StatusOK, &BridgeInfo{ + BridgeID: cfg.ID, + Mode: cfg.BridgeMode, + Home: homeInfo, + Foreign: foreignInfo, + }) } -func (p *Presenter) SearchValidators(r *http.Request) (interface{}, error) { +func (p *Presenter) GetBridgeValidators(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - bridgeID := chi.URLParam(r, "bridgeID") + cfg, _ := ctx.Value(BridgeCfgCtxKey).(*config.BridgeConfig) - if p.bridges[bridgeID] == nil { - return nil, fmt.Errorf("bridge %q not found", bridgeID) - } - - cfg := p.bridges[bridgeID] - res := ValidatorsResult{ - BridgeID: bridgeID, - Home: &ValidatorSideResult{ - ChainID: cfg.Home.Chain.ChainID, - }, - Foreign: &ValidatorSideResult{ - ChainID: cfg.Foreign.Chain.ChainID, - }, - } - - homeCursor, err := p.repo.LogsCursors.GetByChainIDAndAddress(ctx, res.Home.ChainID, cfg.Home.Address) + homeValidators, err := p.repo.BridgeValidators.FindActiveValidators(ctx, cfg.ID, cfg.Home.Chain.ChainID) if err != nil { - p.logger.WithError(err).Error("failed to get home bridge cursor") - return nil, err - } - foreignCursor, err := p.repo.LogsCursors.GetByChainIDAndAddress(ctx, res.Foreign.ChainID, cfg.Foreign.Address) - if err != nil { - p.logger.WithError(err).Error("failed to get foreign bridge cursor") - return nil, err + p.Error(w, r, fmt.Errorf("failed to find home validators: %w", err)) + return } - res.Home.BlockNumber = homeCursor.LastProcessedBlock - res.Foreign.BlockNumber = foreignCursor.LastProcessedBlock - - validators, err := p.repo.BridgeValidators.FindActiveValidators(ctx, bridgeID) + foreignValidators, err := p.repo.BridgeValidators.FindActiveValidators(ctx, cfg.ID, cfg.Home.Chain.ChainID) if err != nil { - p.logger.WithError(err).Error("failed to find validators for bridge id") - return nil, err + p.Error(w, r, fmt.Errorf("failed to find home validators: %w", err)) + return + } + + //nolint:gocritic + validators := append(homeValidators, foreignValidators...) + res := &ValidatorsInfo{ + BridgeID: cfg.ID, + Mode: cfg.BridgeMode, } seenValidators := make(map[common.Address]bool, len(validators)) @@ -142,95 +294,77 @@ func (p *Presenter) SearchValidators(r *http.Request) (interface{}, error) { seenValidators[val.Address] = true valInfo := &ValidatorInfo{ - Signer: val.Address, + Address: val.Address, } - confirmation, err := p.repo.SignedMessages.FindLatest(ctx, bridgeID, res.Home.ChainID, val.Address) - if err != nil { + confirmation, err2 := p.repo.SignedMessages.FindLatest(ctx, cfg.ID, cfg.Home.Chain.ChainID, val.Address) + if err2 != nil { if !errors.Is(err, db.ErrNotFound) { - p.logger.WithError(err).Error("failed to find latest validator confirmation") - return nil, err + p.Error(w, r, fmt.Errorf("failed to find latest validator confirmation: %w", err)) + return } } else { valInfo.LastConfirmation, err = p.getTxInfo(ctx, confirmation.LogID) if err != nil { - p.logger.WithError(err).Error("failed to get tx info") - return nil, err + p.Error(w, r, fmt.Errorf("failed to get tx info: %w", err)) + return } } res.Validators = append(res.Validators, valInfo) } - return res, nil + p.JSON(w, r, http.StatusOK, res) } -var HashRegex = regexp.MustCompile(`^0[xX][\da-fA-F]{64}$`) +func (p *Presenter) getFilteredLogs(ctx context.Context) ([]*entity.Log, error) { + filter, _ := ctx.Value(FilterCtxKey).(*FilterContext) -func (p *Presenter) SearchLogs(r *http.Request) (interface{}, error) { - ctx := r.Context() - q := r.URL.Query() - chainID := q.Get("chainId") - txHash := q.Get("txHash") - block := q.Get("block") - fromBlock := q.Get("fromBlock") - toBlock := q.Get("toBlock") - - var err error - var logs []*entity.Log - if txHash != "" { - if !HashRegex.MatchString(txHash) { - return nil, fmt.Errorf("txHash has invalid format") - } - if block != "" || fromBlock != "" || toBlock != "" { - return nil, fmt.Errorf("block, fromBlock, toBlock must be empty when txHash is specified") - } - - logs, err = p.repo.Logs.FindByTxHash(ctx, common.HexToHash(txHash)) + if filter.TxHash != nil { + logs, err := p.repo.Logs.FindByTxHash(ctx, *filter.TxHash) if err != nil { return nil, err } - if chainID != "" { - filteredLogs := make([]*entity.Log, 0, len(logs)) + if filter.ChainID != nil { + newLogs := make([]*entity.Log, 0, len(logs)) for _, log := range logs { - if log.ChainID == chainID { - filteredLogs = append(filteredLogs, log) + if log.ChainID == *filter.ChainID { + newLogs = append(newLogs, log) } } - logs = filteredLogs + logs = newLogs } - } else if block != "" || (fromBlock != "" && toBlock != "") { - var from, to uint64 - if chainID == "" { - return nil, fmt.Errorf("chainId must be specified when block or fromBlock and toBlock are specified") - } - if block != "" { - if fromBlock != "" || toBlock != "" { - return nil, fmt.Errorf("fromBlock, toBlock must be empty when block is specified") - } - from, err = strconv.ParseUint(block, 10, 32) - if err != nil { - return nil, err - } - to = from - } else { - from, err = strconv.ParseUint(fromBlock, 10, 32) - if err != nil { - return nil, err - } - to, err = strconv.ParseUint(toBlock, 10, 32) - if err != nil { - return nil, err - } - } - logs, err = p.repo.Logs.FindByBlockRange(ctx, chainID, nil, uint(from), uint(to)) - if err != nil { - return nil, err - } - } else { - return nil, fmt.Errorf("either txHash or block or fromBlock and toBlock must be specified") + return logs, nil } - result := make([]*LogResult, len(logs)) + + if filter.ChainID == nil { + return nil, errors.New("chainId query parameter is missing") + } + if filter.FromBlock == nil || filter.ToBlock == nil { + return nil, errors.New("block query parameters are missing") + } + return p.repo.Logs.FindByBlockRange(ctx, *filter.ChainID, nil, *filter.FromBlock, *filter.ToBlock) +} + +func (p *Presenter) GetMessages(w http.ResponseWriter, r *http.Request) { + logs, err := p.getFilteredLogs(r.Context()) + if err != nil { + p.Error(w, r, fmt.Errorf("can't filter logs: %w", err)) + return + } + + res := p.searchForMessagesInLogs(r.Context(), logs) + p.JSON(w, r, http.StatusOK, res) +} + +func (p *Presenter) GetLogs(w http.ResponseWriter, r *http.Request) { + logs, err := p.getFilteredLogs(r.Context()) + if err != nil { + p.Error(w, r, fmt.Errorf("can't filter logs: %w", err)) + return + } + + res := make([]*LogResult, len(logs)) for i, log := range logs { - result[i] = &LogResult{ + res[i] = &LogResult{ LogID: log.ID, ChainID: log.ChainID, Address: log.Address, @@ -243,10 +377,11 @@ func (p *Presenter) SearchLogs(r *http.Request) (interface{}, error) { BlockNumber: log.BlockNumber, } } - return result, nil + + p.JSON(w, r, http.StatusOK, res) } -func (p *Presenter) searchInLogs(ctx context.Context, logs []*entity.Log) []*SearchResult { +func (p *Presenter) searchForMessagesInLogs(ctx context.Context, logs []*entity.Log) []*SearchResult { results := make([]*SearchResult, 0, len(logs)) for _, log := range logs { for _, task := range []func(context.Context, *entity.Log) (*SearchResult, error){ @@ -267,7 +402,7 @@ func (p *Presenter) searchInLogs(ctx context.Context, logs []*entity.Log) []*Sea } } if res.Event == nil { - p.logger.WithError(err).Error("tx event not found in related events") + p.logger.Error("tx event not found in related events") } results = append(results, res) break diff --git a/presenter/types.go b/presenter/types.go index ebaf349..c321272 100644 --- a/presenter/types.go +++ b/presenter/types.go @@ -6,6 +6,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/poanetwork/tokenbridge-monitor/config" "github.com/poanetwork/tokenbridge-monitor/entity" ) @@ -36,12 +37,6 @@ type ErcToNativeMessageInfo struct { Value string } -type TxInfo struct { - BlockNumber uint - Timestamp time.Time - Link string -} - type EventInfo struct { Action string LogID uint `json:"-"` @@ -59,23 +54,6 @@ type SearchResult struct { RelatedEvents []*EventInfo } -type ValidatorInfo struct { - Signer common.Address - LastConfirmation *TxInfo -} - -type ValidatorSideResult struct { - ChainID string - BlockNumber uint -} - -type ValidatorsResult struct { - BridgeID string - Home *ValidatorSideResult - Foreign *ValidatorSideResult - Validators []*ValidatorInfo -} - type LogResult struct { LogID uint ChainID string @@ -88,3 +66,45 @@ type LogResult struct { TxHash common.Hash BlockNumber uint } + +type BridgeInfo struct { + BridgeID string + Mode config.BridgeMode + Home *BridgeSideInfo + Foreign *BridgeSideInfo +} + +type BridgeSideInfo struct { + Chain string + ChainID string + BridgeAddress common.Address + LastFetchedBlock uint + LastFetchBlockTime time.Time + LastProcessedBlock uint + LastProcessedBlockTime time.Time + Validators []common.Address +} + +type ValidatorsInfo struct { + BridgeID string + Mode config.BridgeMode + Validators []*ValidatorInfo +} + +type ValidatorInfo struct { + Address common.Address + LastConfirmation *TxInfo +} + +type TxInfo struct { + BlockNumber uint + Timestamp time.Time + Link string +} + +type FilterContext struct { + ChainID *string + FromBlock *uint + ToBlock *uint + TxHash *common.Hash +} diff --git a/repository/postgres/bridge_validators.go b/repository/postgres/bridge_validators.go index 5f85c7c..4ddbd97 100644 --- a/repository/postgres/bridge_validators.go +++ b/repository/postgres/bridge_validators.go @@ -61,11 +61,12 @@ func (r *bridgeValidatorsRepo) FindActiveValidator(ctx context.Context, bridgeID return val, nil } -func (r *bridgeValidatorsRepo) FindActiveValidators(ctx context.Context, bridgeID string) ([]*entity.BridgeValidator, error) { +func (r *bridgeValidatorsRepo) FindActiveValidators(ctx context.Context, bridgeID string, chainID string) ([]*entity.BridgeValidator, error) { q, args, err := sq.Select("*"). From(r.table). Where(sq.Eq{ "bridge_id": bridgeID, + "chain_id": chainID, "removed_log_id": nil, }). PlaceholderFormat(sq.Dollar).