This commit is contained in:
Agustin Godnic 2023-08-09 11:41:53 -03:00
parent 10a6180a98
commit 02b998e454
11 changed files with 368 additions and 5 deletions

View File

@ -13,11 +13,14 @@ type MongoDB struct {
MongodbDatabase string `split_words:"true" required:"true"`
}
// Logger contains configuration settings for a logger.
type Logger struct {
LogLevel string `split_words:"true" default:"INFO"`
}
type P2p struct {
P2pNetwork string `split_words:"true" required:"true"`
}
// Monitoring contains configuration settings for the monitoring endpoints.
type Monitoring struct {
// MonitoringPort defines the TCP port for the monitoring endpoints.

View File

@ -13,4 +13,8 @@ PPROF_ENABLED=false
MONITORING_PORT=8000
MONGODB_URI=
MONGODB_DATABASE=
LOG_LEVEL=INFO
LOG_LEVEL=INFO
ETHEREUM_REQUESTS_PER_MINUTE=12
ETHEREUM_URL=https://svc.blockdaemon.com/ethereum/mainnet/native
ETHEREUM_AUTH=

View File

@ -13,4 +13,8 @@ PPROF_ENABLED=false
MONITORING_PORT=8000
MONGODB_URI=
MONGODB_DATABASE=
LOG_LEVEL=INFO
LOG_LEVEL=INFO
ETHEREUM_REQUESTS_PER_MINUTE=12
ETHEREUM_URL=https://svc.blockdaemon.com/ethereum/goerli/native
ETHEREUM_AUTH=

View File

@ -13,4 +13,8 @@ PPROF_ENABLED=false
MONITORING_PORT=8000
MONGODB_URI=
MONGODB_DATABASE=
LOG_LEVEL=INFO
LOG_LEVEL=INFO
ETHEREUM_REQUESTS_PER_MINUTE=12
ETHEREUM_URL=https://svc.blockdaemon.com/ethereum/mainnet/native
ETHEREUM_AUTH=

View File

@ -13,4 +13,8 @@ PPROF_ENABLED=false
MONITORING_PORT=8000
MONGODB_URI=
MONGODB_DATABASE=
LOG_LEVEL=INFO
LOG_LEVEL=INFO
ETHEREUM_REQUESTS_PER_MINUTE=12
ETHEREUM_URL=https://svc.blockdaemon.com/ethereum/goerli/native
ETHEREUM_AUTH=

View File

@ -46,6 +46,12 @@ spec:
value: "{{ .MONITORING_PORT }}"
- name: LOG_LEVEL
value: "{{ .LOG_LEVEL }}"
- name: ETHEREUM_REQUESTS_PER_MINUTE
value: "{{ .ETHEREUM_REQUESTS_PER_MINUTE }}"
- name: ETHEREUM_URL
value: "{{ .ETHEREUM_URL }}"
- name: ETHEREUM_AUTH
value: "{{ .ETHEREUM_AUTH }}"
- name: MONGODB_URI
valueFrom:
secretKeyRef:

View File

@ -0,0 +1,133 @@
package clients
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"github.com/ethereum/go-ethereum/common/hexutil"
)
type logsResponse struct {
Result []Log `json:"result"`
}
type Log struct {
Address string `json:"address"`
BlockHash string `json:"blockHash"`
BlockNumber string `json:"blockNumber"`
Data string `json:"data"`
Topics []string `json:"topics"`
TransactionHash string `json:"transactionHash"`
}
type EthRpcClient struct {
Url string
Auth string
}
// TODO add rate limits
func NewEthRpcClient(url string, auth string) *EthRpcClient {
return &EthRpcClient{Url: url, Auth: auth}
}
func (c *EthRpcClient) GetBlockNumber(ctx context.Context) (uint64, error) {
// Create a new HTTP request
payload := strings.NewReader(`{
"id": 1,
"jsonrpc": "2.0",
"method": "eth_blockNumber"
}`)
req, err := http.NewRequestWithContext(ctx, "POST", c.Url, payload)
if err != nil {
return 0, fmt.Errorf("failed to create HTTP request: %w", err)
}
// Add headers
req.Header.Add("accept", "application/json")
req.Header.Add("content-type", "application/json")
req.Header.Add("Authorization", "Bearer: "+c.Auth)
// Send the request
res, err := http.DefaultClient.Do(req)
if err != nil {
return 0, fmt.Errorf("failed to send HTTP request: %w", err)
}
defer res.Body.Close()
if res.Status != "200 OK" {
return 0, fmt.Errorf("encoutered unexpected HTTP status code in response: %s", res.Status)
}
// Deserialize response body
body, err := io.ReadAll(res.Body)
if err != nil {
return 0, fmt.Errorf("failed to read HTTP response body: %w", err)
}
var response struct {
Result string `json:"result"`
}
if err := json.Unmarshal(body, &response); err != nil {
return 0, fmt.Errorf("failed to deserialize HTTP response body: %w", err)
}
// Parse the block number
n, err := hexutil.DecodeUint64(response.Result)
if err != nil {
return 0, fmt.Errorf("failed to parse block number from hex: %w", err)
}
return n, nil
}
func (c *EthRpcClient) GetLogs(
ctx context.Context,
fromBlock uint64,
toBlock uint64,
address string,
topic string,
) ([]Log, error) {
params := fmt.Sprintf(`{
"id": 1,
"jsonrpc": "2.0",
"method": "eth_getLogs",
"params": [{
"address": ["%s"],
"fromBlock":"0x%x",
"toBlock":"0x%x",
"topics": ["%s"]
}]
}`, address, fromBlock, toBlock, topic)
payload := strings.NewReader(params)
req, err := http.NewRequest("POST", c.Url, payload)
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Add("accept", "application/json")
req.Header.Add("content-type", "application/json")
req.Header.Add("Authorization", "Bearer: "+c.Auth)
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to send HTTP request: %w", err)
}
defer res.Body.Close()
// Deserialize response body
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("failed to read HTTP response body: %w", err)
}
var response logsResponse
if err := json.Unmarshal(body, &response); err != nil {
return nil, fmt.Errorf("failed to deserialize HTTP response body: %w", err)
}
return response.Result, nil
}

View File

@ -2,6 +2,7 @@ package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
@ -9,11 +10,13 @@ import (
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/common/settings"
"github.com/wormhole-foundation/wormhole-explorer/event-watcher/config"
"github.com/wormhole-foundation/wormhole-explorer/event-watcher/http"
"github.com/wormhole-foundation/wormhole-explorer/event-watcher/watchers"
"go.uber.org/zap"
)
@ -48,6 +51,11 @@ func main() {
)
server.Start()
// Start the watchers for each chain
if err := startWatchers(rootCtx, rootLogger, db, cfg); err != nil {
rootLogger.Fatal("Failed to start watchers", zap.Error(err))
}
// Block until we get a termination signal or the context is cancelled
rootLogger.Info("waiting for termination signal or context cancellation...")
sigterm := make(chan os.Signal, 1)
@ -66,3 +74,66 @@ func main() {
rootCtxCancel()
rootLogger.Info("terminated")
}
func startWatchers(
ctx context.Context,
logger *zap.Logger,
db *dbutil.Session,
cfg *config.ServiceSettings,
) error {
switch cfg.P2p.P2pNetwork {
case domain.P2pMainNet:
return startWatchersMainnet(ctx, logger, db, cfg)
case domain.P2pTestNet:
return startWatchersTestnet(ctx, logger, db, cfg)
default:
return fmt.Errorf("unknown p2p network: %s", cfg.P2p)
}
}
func startWatchersMainnet(
ctx context.Context,
logger *zap.Logger,
db *dbutil.Session,
cfg *config.ServiceSettings,
) error {
// Start Ethereum watcher
{
w := watchers.NewEvmWatcher(
logger,
db,
config.ETHEREUM_MAINNET.ContractAddress,
config.ETHEREUM_MAINNET.Topic,
cfg.EthereumUrl,
cfg.EthereumAuth,
)
w.Watch(ctx)
}
return nil
}
func startWatchersTestnet(
ctx context.Context,
logger *zap.Logger,
db *dbutil.Session,
cfg *config.ServiceSettings,
) error {
// Start Ethereum watcher
{
w := watchers.NewEvmWatcher(
logger,
db,
config.ETHEREUM_GOERLI.ContractAddress,
config.ETHEREUM_GOERLI.Topic,
cfg.EthereumUrl,
cfg.EthereumAuth,
)
w.Watch(ctx)
}
return nil
}

View File

@ -0,0 +1,19 @@
package config
type EvmParams struct {
StartingBlock uint64
ContractAddress string
Topic string
}
var ETHEREUM_MAINNET = EvmParams{
StartingBlock: 12_959_638,
ContractAddress: "0x98f3c9e6e3face36baad05fe09d375ef1464288b",
Topic: "0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2",
}
var ETHEREUM_GOERLI = EvmParams{
StartingBlock: 5_896_171,
ContractAddress: "0x706abc4e45d419950511e474c7b9ed348a4a716c",
Topic: "0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2",
}

View File

@ -9,4 +9,12 @@ type ServiceSettings struct {
settings.Logger
settings.MongoDB
settings.Monitoring
settings.P2p
WatcherSettings
}
type WatcherSettings struct {
EthereumRequestsPerMinute uint `split_words:"true" default:"INFO"`
EthereumUrl string `split_words:"true" default:"INFO"`
EthereumAuth string `split_words:"true" default:"INFO"`
}

View File

@ -0,0 +1,107 @@
package watchers
import (
"context"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/event-watcher/clients"
"go.uber.org/zap"
"golang.org/x/exp/constraints"
)
const bulkSize = 100
func min[T constraints.Ordered](a, b T) T {
if a < b {
return a
}
return b
}
type EvmWatcher struct {
logger *zap.Logger
db *dbutil.Session
client *clients.EthRpcClient
coreContractAddress string
logTopic string
}
func NewEvmWatcher(
logger *zap.Logger,
db *dbutil.Session,
coreContractAddress string,
logTopic string,
url string,
auth string,
) *EvmWatcher {
w := EvmWatcher{
logger: logger,
db: db,
client: clients.NewEthRpcClient(url, auth),
coreContractAddress: coreContractAddress,
logTopic: logTopic,
}
return &w
}
func (w *EvmWatcher) Watch(ctx context.Context) {
//TODO:
// - initialize current block in the database, if not already initialized.
// - get current block from database
var currentBlock uint64 = 0
for {
// Get the current blockchain head
latestBlock, err := w.client.GetBlockNumber(ctx)
if err != nil {
w.logger.Error("failed to get latest block number",
zap.String("url", w.client.Url),
zap.Error(err),
)
continue
}
// Process blocks in bulk
for currentBlock < latestBlock {
from := currentBlock
to := min(currentBlock+bulkSize, latestBlock)
w.processBlockRange(ctx, from, to)
currentBlock = latestBlock
}
}
}
func (w *EvmWatcher) processBlockRange(ctx context.Context, fromBlock uint64, toBlock uint64) {
var logs []clients.Log
var err error
// Retry until success
for {
logs, err = w.client.GetLogs(ctx, fromBlock, toBlock, w.coreContractAddress, w.logTopic)
if err != nil {
w.logger.Error("failed to get logs",
zap.String("url", w.client.Url),
zap.String("coreContractAddress", w.coreContractAddress),
zap.String("topic", w.logTopic),
zap.Uint64("fromBlock", fromBlock),
zap.Uint64("toBlock", toBlock),
zap.Error(err),
)
}
break
}
// Process logs
// TODO:
// - update current block in database
// - fire events for other services
for i := range logs {
log := logs[i]
w.logger.Info("found log", zap.String("transactionHash", log.TransactionHash))
}
}