diff --git a/common/settings/structs.go b/common/settings/structs.go index 4ac00cb8..4e909a05 100644 --- a/common/settings/structs.go +++ b/common/settings/structs.go @@ -13,11 +13,14 @@ type MongoDB struct { MongodbDatabase string `split_words:"true" required:"true"` } -// Logger contains configuration settings for a logger. type Logger struct { LogLevel string `split_words:"true" default:"INFO"` } +type P2p struct { + P2pNetwork string `split_words:"true" required:"true"` +} + // Monitoring contains configuration settings for the monitoring endpoints. type Monitoring struct { // MonitoringPort defines the TCP port for the monitoring endpoints. diff --git a/deploy/event-watcher/env/production-mainnet.env b/deploy/event-watcher/env/production-mainnet.env index ef5d3a6a..a300e2ee 100644 --- a/deploy/event-watcher/env/production-mainnet.env +++ b/deploy/event-watcher/env/production-mainnet.env @@ -13,4 +13,8 @@ PPROF_ENABLED=false MONITORING_PORT=8000 MONGODB_URI= MONGODB_DATABASE= -LOG_LEVEL=INFO \ No newline at end of file +LOG_LEVEL=INFO + +ETHEREUM_REQUESTS_PER_MINUTE=12 +ETHEREUM_URL=https://svc.blockdaemon.com/ethereum/mainnet/native +ETHEREUM_AUTH= \ No newline at end of file diff --git a/deploy/event-watcher/env/production-testnet.env b/deploy/event-watcher/env/production-testnet.env index f6193ba9..56f11e84 100644 --- a/deploy/event-watcher/env/production-testnet.env +++ b/deploy/event-watcher/env/production-testnet.env @@ -13,4 +13,8 @@ PPROF_ENABLED=false MONITORING_PORT=8000 MONGODB_URI= MONGODB_DATABASE= -LOG_LEVEL=INFO \ No newline at end of file +LOG_LEVEL=INFO + +ETHEREUM_REQUESTS_PER_MINUTE=12 +ETHEREUM_URL=https://svc.blockdaemon.com/ethereum/goerli/native +ETHEREUM_AUTH= \ No newline at end of file diff --git a/deploy/event-watcher/env/staging-mainnet.env b/deploy/event-watcher/env/staging-mainnet.env index 915ba8a6..d5dc77b6 100644 --- a/deploy/event-watcher/env/staging-mainnet.env +++ b/deploy/event-watcher/env/staging-mainnet.env @@ -13,4 +13,8 @@ PPROF_ENABLED=false MONITORING_PORT=8000 MONGODB_URI= MONGODB_DATABASE= -LOG_LEVEL=INFO \ No newline at end of file +LOG_LEVEL=INFO + +ETHEREUM_REQUESTS_PER_MINUTE=12 +ETHEREUM_URL=https://svc.blockdaemon.com/ethereum/mainnet/native +ETHEREUM_AUTH= \ No newline at end of file diff --git a/deploy/event-watcher/env/staging-testnet.env b/deploy/event-watcher/env/staging-testnet.env index 92983a1c..67738fe0 100644 --- a/deploy/event-watcher/env/staging-testnet.env +++ b/deploy/event-watcher/env/staging-testnet.env @@ -13,4 +13,8 @@ PPROF_ENABLED=false MONITORING_PORT=8000 MONGODB_URI= MONGODB_DATABASE= -LOG_LEVEL=INFO \ No newline at end of file +LOG_LEVEL=INFO + +ETHEREUM_REQUESTS_PER_MINUTE=12 +ETHEREUM_URL=https://svc.blockdaemon.com/ethereum/goerli/native +ETHEREUM_AUTH= \ No newline at end of file diff --git a/deploy/event-watcher/event-watcher-service.yaml b/deploy/event-watcher/event-watcher-service.yaml index 8734976e..3faa86a1 100644 --- a/deploy/event-watcher/event-watcher-service.yaml +++ b/deploy/event-watcher/event-watcher-service.yaml @@ -46,6 +46,12 @@ spec: value: "{{ .MONITORING_PORT }}" - name: LOG_LEVEL value: "{{ .LOG_LEVEL }}" + - name: ETHEREUM_REQUESTS_PER_MINUTE + value: "{{ .ETHEREUM_REQUESTS_PER_MINUTE }}" + - name: ETHEREUM_URL + value: "{{ .ETHEREUM_URL }}" + - name: ETHEREUM_AUTH + value: "{{ .ETHEREUM_AUTH }}" - name: MONGODB_URI valueFrom: secretKeyRef: diff --git a/event-watcher/clients/eth_rpc.go b/event-watcher/clients/eth_rpc.go new file mode 100644 index 00000000..cb2fe46e --- /dev/null +++ b/event-watcher/clients/eth_rpc.go @@ -0,0 +1,133 @@ +package clients + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + + "github.com/ethereum/go-ethereum/common/hexutil" +) + +type logsResponse struct { + Result []Log `json:"result"` +} + +type Log struct { + Address string `json:"address"` + BlockHash string `json:"blockHash"` + BlockNumber string `json:"blockNumber"` + Data string `json:"data"` + Topics []string `json:"topics"` + TransactionHash string `json:"transactionHash"` +} + +type EthRpcClient struct { + Url string + Auth string +} + +// TODO add rate limits +func NewEthRpcClient(url string, auth string) *EthRpcClient { + return &EthRpcClient{Url: url, Auth: auth} +} + +func (c *EthRpcClient) GetBlockNumber(ctx context.Context) (uint64, error) { + + // Create a new HTTP request + payload := strings.NewReader(`{ + "id": 1, + "jsonrpc": "2.0", + "method": "eth_blockNumber" + }`) + req, err := http.NewRequestWithContext(ctx, "POST", c.Url, payload) + if err != nil { + return 0, fmt.Errorf("failed to create HTTP request: %w", err) + } + + // Add headers + req.Header.Add("accept", "application/json") + req.Header.Add("content-type", "application/json") + req.Header.Add("Authorization", "Bearer: "+c.Auth) + + // Send the request + res, err := http.DefaultClient.Do(req) + if err != nil { + return 0, fmt.Errorf("failed to send HTTP request: %w", err) + } + defer res.Body.Close() + if res.Status != "200 OK" { + return 0, fmt.Errorf("encoutered unexpected HTTP status code in response: %s", res.Status) + } + + // Deserialize response body + body, err := io.ReadAll(res.Body) + if err != nil { + return 0, fmt.Errorf("failed to read HTTP response body: %w", err) + } + var response struct { + Result string `json:"result"` + } + if err := json.Unmarshal(body, &response); err != nil { + return 0, fmt.Errorf("failed to deserialize HTTP response body: %w", err) + } + + // Parse the block number + n, err := hexutil.DecodeUint64(response.Result) + if err != nil { + return 0, fmt.Errorf("failed to parse block number from hex: %w", err) + } + + return n, nil +} + +func (c *EthRpcClient) GetLogs( + ctx context.Context, + fromBlock uint64, + toBlock uint64, + address string, + topic string, +) ([]Log, error) { + + params := fmt.Sprintf(`{ + "id": 1, + "jsonrpc": "2.0", + "method": "eth_getLogs", + "params": [{ + "address": ["%s"], + "fromBlock":"0x%x", + "toBlock":"0x%x", + "topics": ["%s"] + }] + }`, address, fromBlock, toBlock, topic) + payload := strings.NewReader(params) + + req, err := http.NewRequest("POST", c.Url, payload) + if err != nil { + return nil, fmt.Errorf("failed to create HTTP request: %w", err) + } + + req.Header.Add("accept", "application/json") + req.Header.Add("content-type", "application/json") + req.Header.Add("Authorization", "Bearer: "+c.Auth) + + res, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send HTTP request: %w", err) + } + defer res.Body.Close() + + // Deserialize response body + body, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("failed to read HTTP response body: %w", err) + } + var response logsResponse + if err := json.Unmarshal(body, &response); err != nil { + return nil, fmt.Errorf("failed to deserialize HTTP response body: %w", err) + } + + return response.Result, nil +} diff --git a/event-watcher/cmd/service/main.go b/event-watcher/cmd/service/main.go index abb49994..cc8ed9ef 100644 --- a/event-watcher/cmd/service/main.go +++ b/event-watcher/cmd/service/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "log" "os" "os/signal" @@ -9,11 +10,13 @@ import ( "time" "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" + "github.com/wormhole-foundation/wormhole-explorer/common/domain" "github.com/wormhole-foundation/wormhole-explorer/common/health" "github.com/wormhole-foundation/wormhole-explorer/common/logger" "github.com/wormhole-foundation/wormhole-explorer/common/settings" "github.com/wormhole-foundation/wormhole-explorer/event-watcher/config" "github.com/wormhole-foundation/wormhole-explorer/event-watcher/http" + "github.com/wormhole-foundation/wormhole-explorer/event-watcher/watchers" "go.uber.org/zap" ) @@ -48,6 +51,11 @@ func main() { ) server.Start() + // Start the watchers for each chain + if err := startWatchers(rootCtx, rootLogger, db, cfg); err != nil { + rootLogger.Fatal("Failed to start watchers", zap.Error(err)) + } + // Block until we get a termination signal or the context is cancelled rootLogger.Info("waiting for termination signal or context cancellation...") sigterm := make(chan os.Signal, 1) @@ -66,3 +74,66 @@ func main() { rootCtxCancel() rootLogger.Info("terminated") } + +func startWatchers( + ctx context.Context, + logger *zap.Logger, + db *dbutil.Session, + cfg *config.ServiceSettings, +) error { + + switch cfg.P2p.P2pNetwork { + case domain.P2pMainNet: + return startWatchersMainnet(ctx, logger, db, cfg) + case domain.P2pTestNet: + return startWatchersTestnet(ctx, logger, db, cfg) + default: + return fmt.Errorf("unknown p2p network: %s", cfg.P2p) + } +} + +func startWatchersMainnet( + ctx context.Context, + logger *zap.Logger, + db *dbutil.Session, + cfg *config.ServiceSettings, +) error { + + // Start Ethereum watcher + { + w := watchers.NewEvmWatcher( + logger, + db, + config.ETHEREUM_MAINNET.ContractAddress, + config.ETHEREUM_MAINNET.Topic, + cfg.EthereumUrl, + cfg.EthereumAuth, + ) + w.Watch(ctx) + } + + return nil +} + +func startWatchersTestnet( + ctx context.Context, + logger *zap.Logger, + db *dbutil.Session, + cfg *config.ServiceSettings, +) error { + + // Start Ethereum watcher + { + w := watchers.NewEvmWatcher( + logger, + db, + config.ETHEREUM_GOERLI.ContractAddress, + config.ETHEREUM_GOERLI.Topic, + cfg.EthereumUrl, + cfg.EthereumAuth, + ) + w.Watch(ctx) + } + + return nil +} diff --git a/event-watcher/config/chains.go b/event-watcher/config/chains.go new file mode 100644 index 00000000..a7fc7a10 --- /dev/null +++ b/event-watcher/config/chains.go @@ -0,0 +1,19 @@ +package config + +type EvmParams struct { + StartingBlock uint64 + ContractAddress string + Topic string +} + +var ETHEREUM_MAINNET = EvmParams{ + StartingBlock: 12_959_638, + ContractAddress: "0x98f3c9e6e3face36baad05fe09d375ef1464288b", + Topic: "0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2", +} + +var ETHEREUM_GOERLI = EvmParams{ + StartingBlock: 5_896_171, + ContractAddress: "0x706abc4e45d419950511e474c7b9ed348a4a716c", + Topic: "0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2", +} diff --git a/event-watcher/config/structs.go b/event-watcher/config/structs.go index 6e64562b..d50040c5 100644 --- a/event-watcher/config/structs.go +++ b/event-watcher/config/structs.go @@ -9,4 +9,12 @@ type ServiceSettings struct { settings.Logger settings.MongoDB settings.Monitoring + settings.P2p + WatcherSettings +} + +type WatcherSettings struct { + EthereumRequestsPerMinute uint `split_words:"true" default:"INFO"` + EthereumUrl string `split_words:"true" default:"INFO"` + EthereumAuth string `split_words:"true" default:"INFO"` } diff --git a/event-watcher/watchers/evm.go b/event-watcher/watchers/evm.go new file mode 100644 index 00000000..02b0b3c4 --- /dev/null +++ b/event-watcher/watchers/evm.go @@ -0,0 +1,107 @@ +package watchers + +import ( + "context" + + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" + "github.com/wormhole-foundation/wormhole-explorer/event-watcher/clients" + "go.uber.org/zap" + "golang.org/x/exp/constraints" +) + +const bulkSize = 100 + +func min[T constraints.Ordered](a, b T) T { + if a < b { + return a + } + return b +} + +type EvmWatcher struct { + logger *zap.Logger + db *dbutil.Session + client *clients.EthRpcClient + coreContractAddress string + logTopic string +} + +func NewEvmWatcher( + logger *zap.Logger, + db *dbutil.Session, + coreContractAddress string, + logTopic string, + url string, + auth string, +) *EvmWatcher { + + w := EvmWatcher{ + logger: logger, + db: db, + client: clients.NewEthRpcClient(url, auth), + coreContractAddress: coreContractAddress, + logTopic: logTopic, + } + + return &w +} + +func (w *EvmWatcher) Watch(ctx context.Context) { + + //TODO: + // - initialize current block in the database, if not already initialized. + // - get current block from database + var currentBlock uint64 = 0 + + for { + // Get the current blockchain head + latestBlock, err := w.client.GetBlockNumber(ctx) + if err != nil { + w.logger.Error("failed to get latest block number", + zap.String("url", w.client.Url), + zap.Error(err), + ) + continue + } + + // Process blocks in bulk + for currentBlock < latestBlock { + from := currentBlock + to := min(currentBlock+bulkSize, latestBlock) + w.processBlockRange(ctx, from, to) + + currentBlock = latestBlock + } + } +} + +func (w *EvmWatcher) processBlockRange(ctx context.Context, fromBlock uint64, toBlock uint64) { + + var logs []clients.Log + var err error + + // Retry until success + for { + logs, err = w.client.GetLogs(ctx, fromBlock, toBlock, w.coreContractAddress, w.logTopic) + if err != nil { + w.logger.Error("failed to get logs", + zap.String("url", w.client.Url), + zap.String("coreContractAddress", w.coreContractAddress), + zap.String("topic", w.logTopic), + zap.Uint64("fromBlock", fromBlock), + zap.Uint64("toBlock", toBlock), + zap.Error(err), + ) + } + break + } + + // Process logs + // TODO: + // - update current block in database + // - fire events for other services + for i := range logs { + log := logs[i] + w.logger.Info("found log", zap.String("transactionHash", log.TransactionHash)) + } +}