add rudimentary VAA sending

This commit is contained in:
Hendrik Hofstadt 2020-08-10 00:00:36 +02:00
parent 9749044c25
commit 70a1f24220
2 changed files with 85 additions and 13 deletions

View File

@ -10,11 +10,11 @@ message Empty {
}
message SubmitVAARequest {
bytes vaa = 1;
}
message SubmitVAAResponse {
string signature = 1;
}
message WatchLockupsRequest {

View File

@ -1,11 +1,18 @@
use std::env;
use std::mem::size_of;
use std::rc::Rc;
use std::str::FromStr;
use std::sync::mpsc::RecvError;
use std::thread::sleep;
use solana_client::client_error::ClientError;
use solana_client::rpc_client::RpcClient;
use solana_sdk::fee_calculator::FeeCalculator;
use solana_sdk::instruction::Instruction;
use solana_sdk::program_error::ProgramError;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{read_keypair_file, Keypair, Signer};
use solana_sdk::transaction::Transaction;
use spl_token::state::Account;
use tokio::stream::Stream;
use tokio::sync::mpsc;
@ -15,9 +22,9 @@ use tonic::{transport::Server, Code, Request, Response, Status};
use service::agent_server::{Agent, AgentServer};
use service::{
lockup_event::Event, Empty, LockupEvent, LockupEventNew, LockupEventVaaPosted,
SubmitVaaRequest, SubmitVaaResponse, VaaPostedEvent, WatchLockupsRequest, WatchVaaRequest,
SubmitVaaRequest, SubmitVaaResponse, WatchLockupsRequest,
};
use spl_bridge::instruction::CHAIN_ID_SOLANA;
use spl_bridge::instruction::{post_vaa, CHAIN_ID_SOLANA};
use spl_bridge::state::{Bridge, TransferOutProposal};
use crate::monitor::{ProgramNotificationMessage, PubsubClient};
@ -28,9 +35,12 @@ pub mod service {
include!(concat!(env!("OUT_DIR"), concat!("/", "service", ".rs")));
}
#[derive(Default)]
pub struct AgentImpl {
url: String,
bridge: Pubkey,
rpc_url: String,
key: Keypair,
}
#[tonic::async_trait]
@ -39,26 +49,70 @@ impl Agent for AgentImpl {
&self,
request: Request<SubmitVaaRequest>,
) -> Result<Response<SubmitVaaResponse>, Status> {
println!("Got a request from {:?}", request.remote_addr());
// Hack to clone keypair
let b = self.key.to_bytes();
let key = Keypair::from_bytes(&b).unwrap();
let reply = SubmitVaaResponse {};
Ok(Response::new(reply))
let ix = match post_vaa(&self.bridge, &key.pubkey(), &request.get_ref().vaa) {
Ok(v) => v,
Err(e) => {
return Err(Status::new(
Code::InvalidArgument,
format!("could not create instruction: {}", e),
));
}
};
let mut transaction = Transaction::new_with_payer(&[ix], Some(&key.pubkey()));
let rpc_url = self.rpc_url.clone();
// we need to spawn an extra thread because tokio does not allow nested runtimes
std::thread::spawn(move || {
let rpc = RpcClient::new(rpc_url);
let (recent_blockhash, fee_calculator) = match rpc.get_recent_blockhash() {
Ok(v) => v,
Err(e) => {
return Err(Status::new(
Code::Unavailable,
format!("could not fetch recent blockhash: {}", e),
));
}
};
transaction.sign(&[&key], recent_blockhash);
match rpc.send_and_confirm_transaction(&transaction) {
Ok(s) => Ok(Response::new(SubmitVaaResponse {
signature: s.to_string(),
})),
Err(e) => Err(Status::new(
Code::Unavailable,
format!("tx sending failed: {}", e),
)),
}
})
.join()
.unwrap()
//check_fee_payer_balance(
// config,
// minimum_balance_for_rent_exemption
// + fee_calculator.calculate_fee(&transaction.message()),
//)?;
}
type WatchLockupsStream = mpsc::Receiver<Result<LockupEvent, Status>>;
async fn watch_lockups(
&self,
_: Request<WatchLockupsRequest>,
req: Request<WatchLockupsRequest>,
) -> Result<Response<Self::WatchLockupsStream>, Status> {
let (mut tx, rx) = mpsc::channel(1);
let mut tx1 = tx.clone();
let url = self.url.clone();
let bridge = self.bridge.clone();
// creating a new task
tokio::spawn(async move {
// looping and sending our response using stream
let sub =
PubsubClient::program_subscribe(&url, &Pubkey::from_str("").unwrap()).unwrap();
let sub = PubsubClient::program_subscribe(&url, &bridge).unwrap();
loop {
let item = sub.1.recv();
match item {
@ -136,9 +190,27 @@ impl Agent for AgentImpl {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse().unwrap();
let args: Vec<String> = env::args().collect();
// TODO use clap
if args.len() < 6 {
println!("<bridge> <rpc_host> <rpc_port> <ws_port> <port>");
return Ok(());
}
let bridge = &args[1];
let host = &args[2];
let rpc_port: u8 = args[3].parse()?;
let ws_port: u8 = args[4].parse()?;
let port: u8 = args[5].parse()?;
let addr = format!("[::1]:{}", port).parse().unwrap();
let agent = AgentImpl {
url: String::from("ws://localhost:8900"),
url: String::from(format!("ws://{}:{}", host, ws_port)),
rpc_url: format!("http://{}:{}", host, rpc_port),
bridge: Pubkey::from_str(bridge).unwrap(),
key: Keypair::new(), // TODO
};
println!("Agent listening on {}", addr);