Wire up Solana lockup watcher

This commit is contained in:
Leo 2020-08-21 20:49:33 +02:00
parent 86ccc1c617
commit 72289be8ee
5 changed files with 59 additions and 41 deletions

View File

@ -187,7 +187,7 @@ func main() {
return err return err
} }
if err := supervisor.Run(ctx, "solana", if err := supervisor.Run(ctx, "solwatch",
solana.NewSolanaBridgeWatcher(*agentRPC, lockC, solanaVaaC).Run); err != nil { solana.NewSolanaBridgeWatcher(*agentRPC, lockC, solanaVaaC).Run); err != nil {
return err return err
} }

View File

@ -75,6 +75,8 @@ func vaaConsensusProcessor(lockC chan *common.ChainLock, setC chan *common.Guard
} }
} }
supervisor.Signal(ctx, supervisor.SignalHealthy)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():

View File

@ -10,9 +10,9 @@ import (
) )
type ChainLock struct { type ChainLock struct {
TxHash common.Hash TxHash common.Hash // TODO: rename to identifier? on Solana, this isn't actually the tx hash
Timestamp time.Time Timestamp time.Time
Nonce uint32 Nonce uint32
SourceAddress vaa.Address SourceAddress vaa.Address

View File

@ -4,8 +4,10 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"math/big"
"time" "time"
eth_common "github.com/ethereum/go-ethereum/common"
"google.golang.org/grpc" "google.golang.org/grpc"
agentv1 "github.com/certusone/wormhole/bridge/pkg/proto/agent/v1" agentv1 "github.com/certusone/wormhole/bridge/pkg/proto/agent/v1"
@ -30,6 +32,16 @@ func NewSolanaBridgeWatcher(url string, lockEvents chan *common.ChainLock, vaaQu
return &SolanaBridgeWatcher{url: url, lockChan: lockEvents, vaaChan: vaaQueue} return &SolanaBridgeWatcher{url: url, lockChan: lockEvents, vaaChan: vaaQueue}
} }
// TODO: document/deduplicate
func padAddress(address eth_common.Address) vaa.Address {
paddedAddress := eth_common.LeftPadBytes(address[:], 32)
addr := vaa.Address{}
copy(addr[:], paddedAddress)
return addr
}
func (e *SolanaBridgeWatcher) Run(ctx context.Context) error { func (e *SolanaBridgeWatcher) Run(ctx context.Context) error {
timeout, _ := context.WithTimeout(ctx, 15*time.Second) timeout, _ := context.WithTimeout(ctx, 15*time.Second)
conn, err := grpc.DialContext(timeout, e.url, grpc.WithBlock(), grpc.WithInsecure()) conn, err := grpc.DialContext(timeout, e.url, grpc.WithBlock(), grpc.WithInsecure())
@ -43,42 +55,42 @@ func (e *SolanaBridgeWatcher) Run(ctx context.Context) error {
errC := make(chan error) errC := make(chan error)
logger := supervisor.Logger(ctx) logger := supervisor.Logger(ctx)
//// Subscribe to new token lockups // Subscribe to new token lockups
//tokensLockedSub, err := c.WatchLockups(ctx, &agentv1.WatchLockupsRequest{}) tokensLockedSub, err := c.WatchLockups(ctx, &agentv1.WatchLockupsRequest{})
//if err != nil { if err != nil {
// return fmt.Errorf("failed to subscribe to token lockup events: %w", err) return fmt.Errorf("failed to subscribe to token lockup events: %w", err)
//} }
//
//go func() { go func() {
// // TODO: does this properly terminate on ctx cancellation? logger.Info("watching for on-chain events")
// ev, err := tokensLockedSub.Recv()
// for ; err == nil; ev, err = tokensLockedSub.Recv() { for {
// switch event := ev.Event.(type) { ev, err := tokensLockedSub.Recv()
// case *agentv1.LockupEvent_New: if err != nil {
// lock := &common.ChainLock{ errC <- err
// TxHash: eth_common.HexToHash(ev.TxHash), return
// SourceAddress: event.New.SourceAddress, }
// TargetAddress: event.New.TargetAddress,
// SourceChain: vaa.ChainIDSolana, switch event := ev.Event.(type) {
// TargetChain: vaa.ChainID(event.New.TargetChain), case *agentv1.LockupEvent_New:
// TokenChain: vaa.ChainID(event.New.TokenChain), lock := &common.ChainLock{
// TokenAddress: event.New.TokenAddress, TxHash: eth_common.HexToHash(ev.LockupAddress),
// Amount: new(big.Int).SetBytes(event.New.Amount), Timestamp: time.Time{}, // FIXME
// } Nonce: event.New.Nonce,
// SourceAddress: padAddress(eth_common.BytesToAddress(event.New.SourceAddress)),
// logger.Info("found new lockup transaction", zap.String("tx", ev.TxHash)) TargetAddress: padAddress(eth_common.BytesToAddress(event.New.TargetAddress)),
// e.pendingLocksGuard.Lock() SourceChain: vaa.ChainIDSolana,
// e.pendingLocks[ev.BlockHash] = &pendingLock{ TargetChain: vaa.ChainID(event.New.TargetChain),
// lock: lock, TokenChain: vaa.ChainID(event.New.TokenChain),
// } TokenAddress: padAddress(eth_common.BytesToAddress(event.New.TokenAddress)),
// e.pendingLocksGuard.Unlock() Amount: new(big.Int).SetBytes(event.New.Amount),
// } }
// }
// e.lockChan <- lock
// if err != io.EOF { logger.Info("found new lockup transaction", zap.String("lockup_address", ev.LockupAddress))
// errC <- err }
// } }
//}() }()
go func() { go func() {
for { for {
@ -106,7 +118,7 @@ func (e *SolanaBridgeWatcher) Run(ctx context.Context) error {
} }
logger.Info("submitted VAA", logger.Info("submitted VAA",
zap.String("signature", res.Signature), zap.String("digest", h)) zap.String("tx_sig", res.Signature), zap.String("digest", h))
} }
} }
}() }()

View File

@ -2,6 +2,8 @@ syntax = "proto3";
package agent.v1; package agent.v1;
// TODO: documentation
option go_package = "proto/agent/v1;agentv1"; option go_package = "proto/agent/v1;agentv1";
service Agent { service Agent {
@ -26,7 +28,7 @@ message WatchLockupsRequest {
message LockupEvent { message LockupEvent {
uint64 slot = 1; uint64 slot = 1;
string lockupAddress = 2; string lockupAddress = 2; // TODO: why is this a string?
uint64 time = 3; uint64 time = 3;
oneof event { oneof event {
LockupEventNew new = 4; LockupEventNew new = 4;
@ -35,6 +37,7 @@ message LockupEvent {
} }
} }
// Token on Solana was locked or burned.
message LockupEventNew { message LockupEventNew {
uint32 nonce = 1; uint32 nonce = 1;
uint32 sourceChain = 2; uint32 sourceChain = 2;
@ -46,6 +49,7 @@ message LockupEventNew {
bytes amount = 8; bytes amount = 8;
} }
// A VAA was posted to Solana for data availability.
message LockupEventVAAPosted { message LockupEventVAAPosted {
uint32 nonce = 1; uint32 nonce = 1;
uint32 sourceChain = 2; uint32 sourceChain = 2;