199 lines
6.3 KiB
Go
199 lines
6.3 KiB
Go
package accountant
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"reflect"
|
|
"time"
|
|
|
|
"github.com/certusone/wormhole/node/pkg/common"
|
|
"github.com/wormhole-foundation/wormhole/sdk/vaa"
|
|
|
|
ethCommon "github.com/ethereum/go-ethereum/common"
|
|
|
|
tmAbci "github.com/tendermint/tendermint/abci/types"
|
|
tmHttp "github.com/tendermint/tendermint/rpc/client/http"
|
|
tmCoreTypes "github.com/tendermint/tendermint/rpc/core/types"
|
|
tmTypes "github.com/tendermint/tendermint/types"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// watcher reads transaction events from the smart contract and publishes them.
|
|
func (acct *Accountant) watcher(ctx context.Context) error {
|
|
errC := make(chan error)
|
|
|
|
acct.logger.Info("acctwatch: creating watcher", zap.String("url", acct.wsUrl), zap.String("contract", acct.contract))
|
|
tmConn, err := tmHttp.New(acct.wsUrl, "/websocket")
|
|
if err != nil {
|
|
connectionErrors.Inc()
|
|
return fmt.Errorf("failed to establish tendermint connection: %w", err)
|
|
}
|
|
|
|
if err := tmConn.Start(); err != nil {
|
|
connectionErrors.Inc()
|
|
return fmt.Errorf("failed to start tendermint connection: %w", err)
|
|
}
|
|
defer func() {
|
|
if err := tmConn.Stop(); err != nil {
|
|
connectionErrors.Inc()
|
|
acct.logger.Error("acctwatch: failed to stop tendermint connection", zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
query := fmt.Sprintf("execute._contract_address='%s'", acct.contract)
|
|
events, err := tmConn.Subscribe(
|
|
ctx,
|
|
"guardiand",
|
|
query,
|
|
64, // channel capacity
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to subscribe to accountant events: %w", err)
|
|
}
|
|
defer func() {
|
|
if err := tmConn.UnsubscribeAll(ctx, "guardiand"); err != nil {
|
|
acct.logger.Error("acctwatch: failed to unsubscribe from events", zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
go acct.handleEvents(ctx, events, errC)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case err := <-errC:
|
|
return err
|
|
}
|
|
}
|
|
|
|
// handleEvents handles events from the tendermint client library.
|
|
func (acct *Accountant) handleEvents(ctx context.Context, evts <-chan tmCoreTypes.ResultEvent, errC chan error) {
|
|
defer close(errC)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case e := <-evts:
|
|
tx, ok := e.Data.(tmTypes.EventDataTx)
|
|
if !ok {
|
|
acct.logger.Error("acctwatcher: unknown data from event subscription", zap.Stringer("e.Data", reflect.TypeOf(e.Data)), zap.Any("event", e))
|
|
continue
|
|
}
|
|
|
|
for _, event := range tx.Result.Events {
|
|
if event.Type == "wasm-Transfer" {
|
|
xfer, err := parseWasmTransfer(acct.logger, event, acct.contract)
|
|
if err != nil {
|
|
acct.logger.Error("acctwatcher: failed to parse wasm event", zap.Error(err), zap.Stringer("e.Data", reflect.TypeOf(e.Data)), zap.Any("event", event))
|
|
continue
|
|
}
|
|
|
|
eventsReceived.Inc()
|
|
acct.processPendingTransfer(xfer)
|
|
} else {
|
|
acct.logger.Debug("acctwatcher: ignoring non-transfer event", zap.String("eventType", event.Type))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// WasmTransfer represents a transfer event from the smart contract.
|
|
type WasmTransfer struct {
|
|
TxHashBytes []byte `json:"tx_hash"`
|
|
Timestamp uint32 `json:"timestamp"`
|
|
Nonce uint32 `json:"nonce"`
|
|
EmitterChain uint16 `json:"emitter_chain"`
|
|
EmitterAddress vaa.Address `json:"emitter_address"`
|
|
Sequence uint64 `json:"sequence"`
|
|
ConsistencyLevel uint8 `json:"consistency_level"`
|
|
Payload []byte `json:"payload"`
|
|
}
|
|
|
|
// parseWasmTransfer parses transfer events from the smart contract. All other event types are ignored.
|
|
func parseWasmTransfer(logger *zap.Logger, event tmAbci.Event, contractAddress string) (*WasmTransfer, error) {
|
|
if event.Type != "wasm-Transfer" {
|
|
return nil, fmt.Errorf("not a WasmTransfer event: %s", event.Type)
|
|
}
|
|
|
|
attrs := make(map[string]json.RawMessage)
|
|
for _, attr := range event.Attributes {
|
|
if string(attr.Key) == "_contract_address" {
|
|
if string(attr.Value) != contractAddress {
|
|
return nil, fmt.Errorf("WasmTransfer event from unexpected contract: %s", string(attr.Value))
|
|
}
|
|
} else {
|
|
logger.Debug("acctwatcher: attribute", zap.String("key", string(attr.Key)), zap.String("value", string(attr.Value)))
|
|
attrs[string(attr.Key)] = attr.Value
|
|
}
|
|
}
|
|
|
|
attrBytes, err := json.Marshal(attrs)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal event attributes: %w", err)
|
|
}
|
|
|
|
evt := new(WasmTransfer)
|
|
if err := json.Unmarshal(attrBytes, evt); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal WasmTransfer event: %w", err)
|
|
}
|
|
|
|
return evt, nil
|
|
}
|
|
|
|
// processPendingTransfer takes a WasmTransfer event, determines if we are expecting it, and if so, publishes it.
|
|
func (acct *Accountant) processPendingTransfer(xfer *WasmTransfer) {
|
|
acct.logger.Info("acctwatch: transfer event detected",
|
|
zap.String("tx_hash", hex.EncodeToString(xfer.TxHashBytes)),
|
|
zap.Uint32("timestamp", xfer.Timestamp),
|
|
zap.Uint32("nonce", xfer.Nonce),
|
|
zap.Stringer("emitter_chain", vaa.ChainID(xfer.EmitterChain)),
|
|
zap.Stringer("emitter_address", xfer.EmitterAddress),
|
|
zap.Uint64("sequence", xfer.Sequence),
|
|
zap.Uint8("consistency_level", xfer.ConsistencyLevel),
|
|
zap.String("payload", hex.EncodeToString(xfer.Payload)),
|
|
)
|
|
|
|
msg := &common.MessagePublication{
|
|
TxHash: ethCommon.BytesToHash(xfer.TxHashBytes),
|
|
Timestamp: time.Unix(int64(xfer.Timestamp), 0),
|
|
Nonce: xfer.Nonce,
|
|
Sequence: xfer.Sequence,
|
|
EmitterChain: vaa.ChainID(xfer.EmitterChain),
|
|
EmitterAddress: xfer.EmitterAddress,
|
|
Payload: xfer.Payload,
|
|
ConsistencyLevel: xfer.ConsistencyLevel,
|
|
}
|
|
|
|
msgId := msg.MessageIDString()
|
|
|
|
acct.pendingTransfersLock.Lock()
|
|
defer acct.pendingTransfersLock.Unlock()
|
|
|
|
pe, exists := acct.pendingTransfers[msgId]
|
|
if exists {
|
|
digest := msg.CreateDigest()
|
|
if pe.digest != digest {
|
|
digestMismatches.Inc()
|
|
acct.logger.Error("acctwatch: digest mismatch, dropping transfer",
|
|
zap.String("msgID", msgId),
|
|
zap.String("oldDigest", pe.digest),
|
|
zap.String("newDigest", digest),
|
|
)
|
|
|
|
acct.deletePendingTransfer(msgId)
|
|
return
|
|
}
|
|
acct.logger.Info("acctwatch: pending transfer has been approved", zap.String("msgId", msgId))
|
|
acct.publishTransfer(pe)
|
|
transfersApproved.Inc()
|
|
} else {
|
|
// TODO: We could issue a reobservation request here since it looks like other guardians have seen this transfer but we haven't.
|
|
acct.logger.Info("acctwatch: unknown transfer has been approved, ignoring it", zap.String("msgId", msgId))
|
|
}
|
|
}
|