diff --git a/bridge/cmd/guardiand/main.go b/bridge/cmd/guardiand/main.go index 8f2efbfb..d4c41422 100644 --- a/bridge/cmd/guardiand/main.go +++ b/bridge/cmd/guardiand/main.go @@ -187,7 +187,7 @@ func main() { return err } - if err := supervisor.Run(ctx, "solana", + if err := supervisor.Run(ctx, "solwatch", solana.NewSolanaBridgeWatcher(*agentRPC, lockC, solanaVaaC).Run); err != nil { return err } diff --git a/bridge/cmd/guardiand/processor.go b/bridge/cmd/guardiand/processor.go index d861104d..7340f671 100644 --- a/bridge/cmd/guardiand/processor.go +++ b/bridge/cmd/guardiand/processor.go @@ -75,6 +75,8 @@ func vaaConsensusProcessor(lockC chan *common.ChainLock, setC chan *common.Guard } } + supervisor.Signal(ctx, supervisor.SignalHealthy) + for { select { case <-ctx.Done(): diff --git a/bridge/pkg/common/chainlock.go b/bridge/pkg/common/chainlock.go index 54ecda81..0f7681c6 100644 --- a/bridge/pkg/common/chainlock.go +++ b/bridge/pkg/common/chainlock.go @@ -10,9 +10,9 @@ import ( ) type ChainLock struct { - TxHash common.Hash + TxHash common.Hash // TODO: rename to identifier? on Solana, this isn't actually the tx hash Timestamp time.Time - + Nonce uint32 SourceAddress vaa.Address diff --git a/bridge/pkg/solana/watcher.go b/bridge/pkg/solana/watcher.go index 57569382..b02e7ed1 100644 --- a/bridge/pkg/solana/watcher.go +++ b/bridge/pkg/solana/watcher.go @@ -4,8 +4,10 @@ import ( "context" "encoding/hex" "fmt" + "math/big" "time" + eth_common "github.com/ethereum/go-ethereum/common" "google.golang.org/grpc" agentv1 "github.com/certusone/wormhole/bridge/pkg/proto/agent/v1" @@ -30,6 +32,16 @@ func NewSolanaBridgeWatcher(url string, lockEvents chan *common.ChainLock, vaaQu return &SolanaBridgeWatcher{url: url, lockChan: lockEvents, vaaChan: vaaQueue} } +// TODO: document/deduplicate +func padAddress(address eth_common.Address) vaa.Address { + paddedAddress := eth_common.LeftPadBytes(address[:], 32) + + addr := vaa.Address{} + copy(addr[:], paddedAddress) + + return addr +} + func (e *SolanaBridgeWatcher) Run(ctx context.Context) error { timeout, _ := context.WithTimeout(ctx, 15*time.Second) conn, err := grpc.DialContext(timeout, e.url, grpc.WithBlock(), grpc.WithInsecure()) @@ -43,42 +55,42 @@ func (e *SolanaBridgeWatcher) Run(ctx context.Context) error { errC := make(chan error) logger := supervisor.Logger(ctx) - //// Subscribe to new token lockups - //tokensLockedSub, err := c.WatchLockups(ctx, &agentv1.WatchLockupsRequest{}) - //if err != nil { - // return fmt.Errorf("failed to subscribe to token lockup events: %w", err) - //} - // - //go func() { - // // TODO: does this properly terminate on ctx cancellation? - // ev, err := tokensLockedSub.Recv() - // for ; err == nil; ev, err = tokensLockedSub.Recv() { - // switch event := ev.Event.(type) { - // case *agentv1.LockupEvent_New: - // lock := &common.ChainLock{ - // TxHash: eth_common.HexToHash(ev.TxHash), - // SourceAddress: event.New.SourceAddress, - // TargetAddress: event.New.TargetAddress, - // SourceChain: vaa.ChainIDSolana, - // TargetChain: vaa.ChainID(event.New.TargetChain), - // TokenChain: vaa.ChainID(event.New.TokenChain), - // TokenAddress: event.New.TokenAddress, - // Amount: new(big.Int).SetBytes(event.New.Amount), - // } - // - // logger.Info("found new lockup transaction", zap.String("tx", ev.TxHash)) - // e.pendingLocksGuard.Lock() - // e.pendingLocks[ev.BlockHash] = &pendingLock{ - // lock: lock, - // } - // e.pendingLocksGuard.Unlock() - // } - // } - // - // if err != io.EOF { - // errC <- err - // } - //}() + // Subscribe to new token lockups + tokensLockedSub, err := c.WatchLockups(ctx, &agentv1.WatchLockupsRequest{}) + if err != nil { + return fmt.Errorf("failed to subscribe to token lockup events: %w", err) + } + + go func() { + logger.Info("watching for on-chain events") + + for { + ev, err := tokensLockedSub.Recv() + if err != nil { + errC <- err + return + } + + switch event := ev.Event.(type) { + case *agentv1.LockupEvent_New: + lock := &common.ChainLock{ + TxHash: eth_common.HexToHash(ev.LockupAddress), + Timestamp: time.Time{}, // FIXME + Nonce: event.New.Nonce, + SourceAddress: padAddress(eth_common.BytesToAddress(event.New.SourceAddress)), + TargetAddress: padAddress(eth_common.BytesToAddress(event.New.TargetAddress)), + SourceChain: vaa.ChainIDSolana, + TargetChain: vaa.ChainID(event.New.TargetChain), + TokenChain: vaa.ChainID(event.New.TokenChain), + TokenAddress: padAddress(eth_common.BytesToAddress(event.New.TokenAddress)), + Amount: new(big.Int).SetBytes(event.New.Amount), + } + + e.lockChan <- lock + logger.Info("found new lockup transaction", zap.String("lockup_address", ev.LockupAddress)) + } + } + }() go func() { for { @@ -106,7 +118,7 @@ func (e *SolanaBridgeWatcher) Run(ctx context.Context) error { } logger.Info("submitted VAA", - zap.String("signature", res.Signature), zap.String("digest", h)) + zap.String("tx_sig", res.Signature), zap.String("digest", h)) } } }() diff --git a/proto/agent/v1/service.proto b/proto/agent/v1/service.proto index 80cee37c..177adf47 100644 --- a/proto/agent/v1/service.proto +++ b/proto/agent/v1/service.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package agent.v1; +// TODO: documentation + option go_package = "proto/agent/v1;agentv1"; service Agent { @@ -26,7 +28,7 @@ message WatchLockupsRequest { message LockupEvent { uint64 slot = 1; - string lockupAddress = 2; + string lockupAddress = 2; // TODO: why is this a string? uint64 time = 3; oneof event { LockupEventNew new = 4; @@ -35,6 +37,7 @@ message LockupEvent { } } +// Token on Solana was locked or burned. message LockupEventNew { uint32 nonce = 1; uint32 sourceChain = 2; @@ -46,6 +49,7 @@ message LockupEventNew { bytes amount = 8; } +// A VAA was posted to Solana for data availability. message LockupEventVAAPosted { uint32 nonce = 1; uint32 sourceChain = 2;