Add slot to agent
This commit is contained in:
parent
9d6f8cde0e
commit
8f4127f781
|
@ -0,0 +1,112 @@
|
|||
package ethereum
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
agentv1 "github.com/certusone/wormhole/bridge/pkg/proto/agent/v1"
|
||||
"google.golang.org/grpc"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/certusone/wormhole/bridge/pkg/common"
|
||||
"github.com/certusone/wormhole/bridge/pkg/supervisor"
|
||||
"github.com/certusone/wormhole/bridge/pkg/vaa"
|
||||
)
|
||||
|
||||
type (
|
||||
SolanaBridgeWatcher struct {
|
||||
url string
|
||||
|
||||
pendingLocks map[string]*pendingLock
|
||||
pendingLocksGuard sync.Mutex
|
||||
|
||||
lockChan chan *common.ChainLock
|
||||
setChan chan *common.GuardianSet
|
||||
vaaChan chan *vaa.VAA
|
||||
}
|
||||
|
||||
pendingLock struct {
|
||||
lock *common.ChainLock
|
||||
}
|
||||
)
|
||||
|
||||
func NewSolanaBridgeWatcher(url string, lockEvents chan *common.ChainLock, setEvents chan *common.GuardianSet, vaaQueue chan *vaa.VAA) *SolanaBridgeWatcher {
|
||||
return &SolanaBridgeWatcher{url: url, lockChan: lockEvents, setChan: setEvents, pendingLocks: map[string]*pendingLock{}, vaaChan: vaaQueue}
|
||||
}
|
||||
|
||||
func (e *SolanaBridgeWatcher) Run(ctx context.Context) error {
|
||||
conn, err := grpc.Dial(e.url)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to dial agent: %w", err)
|
||||
}
|
||||
c := agentv1.NewAgentClient(conn)
|
||||
|
||||
errC := make(chan error)
|
||||
logger := supervisor.Logger(ctx)
|
||||
//// Subscribe to new token lockups
|
||||
//tokensLockedSub, err := c.WatchLockups(ctx, &agentv1.WatchLockupsRequest{})
|
||||
//if err != nil {
|
||||
// return fmt.Errorf("failed to subscribe to token lockup events: %w", err)
|
||||
//}
|
||||
//
|
||||
//
|
||||
//go func() {
|
||||
// ev, err := tokensLockedSub.Recv()
|
||||
// for ; err == nil; ev, err = tokensLockedSub.Recv() {
|
||||
// switch event := ev.Event.(type) {
|
||||
// case *agentv1.LockupEvent_New:
|
||||
// lock := &common.ChainLock{
|
||||
// TxHash: eth_common.HexToHash(ev.TxHash),
|
||||
// SourceAddress: event.New.SourceAddress,
|
||||
// TargetAddress: event.New.TargetAddress,
|
||||
// SourceChain: vaa.ChainIDSolana,
|
||||
// TargetChain: vaa.ChainID(event.New.TargetChain),
|
||||
// TokenChain: vaa.ChainID(event.New.TokenChain),
|
||||
// TokenAddress: event.New.TokenAddress,
|
||||
// Amount: new(big.Int).SetBytes(event.New.Amount),
|
||||
// }
|
||||
//
|
||||
// logger.Info("found new lockup transaction", zap.String("tx", ev.TxHash))
|
||||
// e.pendingLocksGuard.Lock()
|
||||
// e.pendingLocks[ev.BlockHash] = &pendingLock{
|
||||
// lock: lock,
|
||||
// }
|
||||
// e.pendingLocksGuard.Unlock()
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// if err != io.EOF {
|
||||
// errC <- err
|
||||
// }
|
||||
//}()
|
||||
|
||||
go func() {
|
||||
for v := range e.vaaChan {
|
||||
vaaBytes, err := v.Marshal()
|
||||
if err != nil {
|
||||
logger.Error("failed to marshal VAA", zap.Any("vaa", v), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
timeout, _ := context.WithTimeout(ctx, 15*time.Second)
|
||||
res, err := c.SubmitVAA(timeout, &agentv1.SubmitVAARequest{Vaa: vaaBytes})
|
||||
if err != nil {
|
||||
errC <- fmt.Errorf("failed to submit VAA: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Debug("submitted VAA", zap.String("signature", res.Signature))
|
||||
}
|
||||
}()
|
||||
|
||||
supervisor.Signal(ctx, supervisor.SignalHealthy)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-errC:
|
||||
return err
|
||||
}
|
||||
}
|
|
@ -25,10 +25,11 @@ message WatchLockupsRequest {
|
|||
}
|
||||
|
||||
message LockupEvent {
|
||||
uint64 slot = 1;
|
||||
oneof event {
|
||||
LockupEventNew new = 1;
|
||||
LockupEventVAAPosted vaaPosted = 2;
|
||||
Empty empty = 3;
|
||||
LockupEventNew new = 2;
|
||||
LockupEventVAAPosted vaaPosted = 3;
|
||||
Empty empty = 4;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,10 +1,3 @@
|
|||
use std::env;
|
||||
use std::mem::size_of;
|
||||
use std::rc::Rc;
|
||||
use std::str::FromStr;
|
||||
use std::sync::mpsc::RecvError;
|
||||
use std::thread::sleep;
|
||||
|
||||
use solana_client::client_error::ClientError;
|
||||
use solana_client::rpc_client::RpcClient;
|
||||
use solana_sdk::fee_calculator::FeeCalculator;
|
||||
|
@ -26,6 +19,12 @@ use service::{
|
|||
};
|
||||
use spl_bridge::instruction::{post_vaa, CHAIN_ID_SOLANA};
|
||||
use spl_bridge::state::{Bridge, TransferOutProposal};
|
||||
use std::env;
|
||||
use std::mem::size_of;
|
||||
use std::rc::Rc;
|
||||
use std::str::FromStr;
|
||||
use std::sync::mpsc::RecvError;
|
||||
use std::thread::sleep;
|
||||
|
||||
use crate::monitor::{ProgramNotificationMessage, PubsubClient};
|
||||
|
||||
|
@ -131,6 +130,7 @@ impl Agent for AgentImpl {
|
|||
let event = if b.vaa_time == 0 {
|
||||
// The Lockup was created
|
||||
LockupEvent {
|
||||
slot: v.context.slot,
|
||||
event: Some(Event::New(LockupEventNew {
|
||||
nonce: b.nonce,
|
||||
source_chain: CHAIN_ID_SOLANA as u32,
|
||||
|
@ -145,6 +145,7 @@ impl Agent for AgentImpl {
|
|||
} else {
|
||||
// The VAA was submitted
|
||||
LockupEvent {
|
||||
slot: v.context.slot,
|
||||
event: Some(Event::VaaPosted(LockupEventVaaPosted {
|
||||
nonce: b.nonce,
|
||||
source_chain: CHAIN_ID_SOLANA as u32,
|
||||
|
@ -178,6 +179,7 @@ impl Agent for AgentImpl {
|
|||
// We need to keep the channel alive https://github.com/hyperium/tonic/issues/378
|
||||
loop {
|
||||
tx1.send(Ok(LockupEvent {
|
||||
slot: 0,
|
||||
event: Some(Event::Empty(Empty {})),
|
||||
}))
|
||||
.await;
|
||||
|
|
Loading…
Reference in New Issue