From 8f4127f7813a8d3602c19a6f6bebfecc73738770 Mon Sep 17 00:00:00 2001 From: Hendrik Hofstadt Date: Thu, 20 Aug 2020 19:20:11 +0200 Subject: [PATCH] Add slot to agent --- bridge/pkg/solana/watcher.go | 112 +++++++++++++++++++++++++++++++++++ proto/agent/v1/service.proto | 7 ++- solana/agent/src/main.rs | 16 ++--- 3 files changed, 125 insertions(+), 10 deletions(-) create mode 100644 bridge/pkg/solana/watcher.go diff --git a/bridge/pkg/solana/watcher.go b/bridge/pkg/solana/watcher.go new file mode 100644 index 00000000..bf89a210 --- /dev/null +++ b/bridge/pkg/solana/watcher.go @@ -0,0 +1,112 @@ +package ethereum + +import ( + "context" + "fmt" + agentv1 "github.com/certusone/wormhole/bridge/pkg/proto/agent/v1" + "google.golang.org/grpc" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/certusone/wormhole/bridge/pkg/common" + "github.com/certusone/wormhole/bridge/pkg/supervisor" + "github.com/certusone/wormhole/bridge/pkg/vaa" +) + +type ( + SolanaBridgeWatcher struct { + url string + + pendingLocks map[string]*pendingLock + pendingLocksGuard sync.Mutex + + lockChan chan *common.ChainLock + setChan chan *common.GuardianSet + vaaChan chan *vaa.VAA + } + + pendingLock struct { + lock *common.ChainLock + } +) + +func NewSolanaBridgeWatcher(url string, lockEvents chan *common.ChainLock, setEvents chan *common.GuardianSet, vaaQueue chan *vaa.VAA) *SolanaBridgeWatcher { + return &SolanaBridgeWatcher{url: url, lockChan: lockEvents, setChan: setEvents, pendingLocks: map[string]*pendingLock{}, vaaChan: vaaQueue} +} + +func (e *SolanaBridgeWatcher) Run(ctx context.Context) error { + conn, err := grpc.Dial(e.url) + if err != nil { + return fmt.Errorf("failed to dial agent: %w", err) + } + c := agentv1.NewAgentClient(conn) + + errC := make(chan error) + logger := supervisor.Logger(ctx) + //// Subscribe to new token lockups + //tokensLockedSub, err := c.WatchLockups(ctx, &agentv1.WatchLockupsRequest{}) + //if err != nil { + // return fmt.Errorf("failed to subscribe to token lockup events: %w", err) + //} + // + // + //go func() { + // ev, err := tokensLockedSub.Recv() + // for ; err == nil; ev, err = tokensLockedSub.Recv() { + // switch event := ev.Event.(type) { + // case *agentv1.LockupEvent_New: + // lock := &common.ChainLock{ + // TxHash: eth_common.HexToHash(ev.TxHash), + // SourceAddress: event.New.SourceAddress, + // TargetAddress: event.New.TargetAddress, + // SourceChain: vaa.ChainIDSolana, + // TargetChain: vaa.ChainID(event.New.TargetChain), + // TokenChain: vaa.ChainID(event.New.TokenChain), + // TokenAddress: event.New.TokenAddress, + // Amount: new(big.Int).SetBytes(event.New.Amount), + // } + // + // logger.Info("found new lockup transaction", zap.String("tx", ev.TxHash)) + // e.pendingLocksGuard.Lock() + // e.pendingLocks[ev.BlockHash] = &pendingLock{ + // lock: lock, + // } + // e.pendingLocksGuard.Unlock() + // } + // } + // + // if err != io.EOF { + // errC <- err + // } + //}() + + go func() { + for v := range e.vaaChan { + vaaBytes, err := v.Marshal() + if err != nil { + logger.Error("failed to marshal VAA", zap.Any("vaa", v), zap.Error(err)) + continue + } + + timeout, _ := context.WithTimeout(ctx, 15*time.Second) + res, err := c.SubmitVAA(timeout, &agentv1.SubmitVAARequest{Vaa: vaaBytes}) + if err != nil { + errC <- fmt.Errorf("failed to submit VAA: %w", err) + return + } + + logger.Debug("submitted VAA", zap.String("signature", res.Signature)) + } + }() + + supervisor.Signal(ctx, supervisor.SignalHealthy) + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errC: + return err + } +} diff --git a/proto/agent/v1/service.proto b/proto/agent/v1/service.proto index fa2ad671..e1e69ac6 100644 --- a/proto/agent/v1/service.proto +++ b/proto/agent/v1/service.proto @@ -25,10 +25,11 @@ message WatchLockupsRequest { } message LockupEvent { + uint64 slot = 1; oneof event { - LockupEventNew new = 1; - LockupEventVAAPosted vaaPosted = 2; - Empty empty = 3; + LockupEventNew new = 2; + LockupEventVAAPosted vaaPosted = 3; + Empty empty = 4; } } diff --git a/solana/agent/src/main.rs b/solana/agent/src/main.rs index d3f3bbac..dde21e4d 100644 --- a/solana/agent/src/main.rs +++ b/solana/agent/src/main.rs @@ -1,10 +1,3 @@ -use std::env; -use std::mem::size_of; -use std::rc::Rc; -use std::str::FromStr; -use std::sync::mpsc::RecvError; -use std::thread::sleep; - use solana_client::client_error::ClientError; use solana_client::rpc_client::RpcClient; use solana_sdk::fee_calculator::FeeCalculator; @@ -26,6 +19,12 @@ use service::{ }; use spl_bridge::instruction::{post_vaa, CHAIN_ID_SOLANA}; use spl_bridge::state::{Bridge, TransferOutProposal}; +use std::env; +use std::mem::size_of; +use std::rc::Rc; +use std::str::FromStr; +use std::sync::mpsc::RecvError; +use std::thread::sleep; use crate::monitor::{ProgramNotificationMessage, PubsubClient}; @@ -131,6 +130,7 @@ impl Agent for AgentImpl { let event = if b.vaa_time == 0 { // The Lockup was created LockupEvent { + slot: v.context.slot, event: Some(Event::New(LockupEventNew { nonce: b.nonce, source_chain: CHAIN_ID_SOLANA as u32, @@ -145,6 +145,7 @@ impl Agent for AgentImpl { } else { // The VAA was submitted LockupEvent { + slot: v.context.slot, event: Some(Event::VaaPosted(LockupEventVaaPosted { nonce: b.nonce, source_chain: CHAIN_ID_SOLANA as u32, @@ -178,6 +179,7 @@ impl Agent for AgentImpl { // We need to keep the channel alive https://github.com/hyperium/tonic/issues/378 loop { tx1.send(Ok(LockupEvent { + slot: 0, event: Some(Event::Empty(Empty {})), })) .await;