add watch-only version of the rust agent

This commit is contained in:
Hendrik Hofstadt 2020-08-08 21:32:33 +02:00
parent 3b96e0cc6d
commit f158bb4b5f
14 changed files with 10800 additions and 5 deletions

3649
solana/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

2
solana/Cargo.toml Normal file
View File

@ -0,0 +1,2 @@
[workspace]
members = ["agent", "bridge", "cli"]

1038
solana/agent/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

31
solana/agent/Cargo.toml Normal file
View File

@ -0,0 +1,31 @@
[package]
name = "agent"
version = "0.1.0"
authors = ["Hendrik Hofstadt <hendrik@nexantic.com>"]
edition = "2018"
[dependencies]
tonic = "0.3.0"
tokio = { version = "0.2", features = ["rt-threaded", "time", "stream", "fs", "macros", "uds"] }
prost = "0.6"
prost-types = "0.6"
solana-sdk = { version = "1.3.1" }
solana-client = { version = "1.3.1" }
solana-faucet = "1.3.1"
solana-transaction-status = "1.3.1"
spl-token = { package = "spl-token", git = "https://github.com/solana-labs/solana-program-library" }
wormhole-bridge = { path = "../bridge" }
primitive-types = {version ="0.7.2"}
hex = "0.4.2"
thiserror = "1.0.20"
tungstenite = "0.11.1"
serde = "1.0.103"
url="2.1.1"
serde_bytes ="0.11.5"
log ="0.4.8"
serde_derive = "1.0.103"
serde_json = "1.0.57"
bs58 = "0.3.1"
[build-dependencies]
tonic-build = { version = "0.3.0", features = ["prost"] }

3
solana/agent/build.rs Normal file
View File

@ -0,0 +1,3 @@
fn main() {
tonic_build::compile_protos("proto/service.proto").unwrap();
}

View File

@ -0,0 +1,54 @@
syntax = "proto3";
package service;
service Agent {
rpc SubmitVAA (SubmitVAARequest) returns (SubmitVAAResponse);
rpc WatchLockups (WatchLockupsRequest) returns (stream LockupEvent);
}
message Empty {
}
message SubmitVAARequest {
}
message SubmitVAAResponse {
}
message WatchLockupsRequest {
}
message LockupEvent {
oneof event {
LockupEventNew new = 1;
LockupEventVAAPosted vaaPosted = 2;
Empty empty = 3;
}
}
message LockupEventNew {
uint32 nonce = 1;
uint32 sourceChain = 2;
uint32 targetChain = 3;
bytes sourceAddress = 4;
bytes targetAddress = 5;
uint32 tokenChain = 6;
bytes tokenAddress = 7;
bytes amount = 8;
}
message LockupEventVAAPosted {
uint32 nonce = 1;
uint32 sourceChain = 2;
uint32 targetChain = 3;
bytes sourceAddress = 4;
bytes targetAddress = 5;
uint32 tokenChain = 6;
bytes tokenAddress = 7;
bytes amount = 8;
bytes vaa = 9;
}

152
solana/agent/src/main.rs Normal file
View File

@ -0,0 +1,152 @@
use std::mem::size_of;
use std::rc::Rc;
use std::str::FromStr;
use std::sync::mpsc::RecvError;
use std::thread::sleep;
use solana_sdk::program_error::ProgramError;
use solana_sdk::pubkey::Pubkey;
use spl_token::state::Account;
use tokio::stream::Stream;
use tokio::sync::mpsc;
use tokio::time::Duration;
use tonic::{transport::Server, Code, Request, Response, Status};
use service::agent_server::{Agent, AgentServer};
use service::{
lockup_event::Event, Empty, LockupEvent, LockupEventNew, LockupEventVaaPosted,
SubmitVaaRequest, SubmitVaaResponse, VaaPostedEvent, WatchLockupsRequest, WatchVaaRequest,
};
use spl_bridge::instruction::CHAIN_ID_SOLANA;
use spl_bridge::state::{Bridge, TransferOutProposal};
use crate::monitor::{ProgramNotificationMessage, PubsubClient};
mod monitor;
pub mod service {
include!(concat!(env!("OUT_DIR"), concat!("/", "service", ".rs")));
}
#[derive(Default)]
pub struct AgentImpl {
url: String,
}
#[tonic::async_trait]
impl Agent for AgentImpl {
async fn submit_vaa(
&self,
request: Request<SubmitVaaRequest>,
) -> Result<Response<SubmitVaaResponse>, Status> {
println!("Got a request from {:?}", request.remote_addr());
let reply = SubmitVaaResponse {};
Ok(Response::new(reply))
}
type WatchLockupsStream = mpsc::Receiver<Result<LockupEvent, Status>>;
async fn watch_lockups(
&self,
_: Request<WatchLockupsRequest>,
) -> Result<Response<Self::WatchLockupsStream>, Status> {
let (mut tx, rx) = mpsc::channel(1);
let mut tx1 = tx.clone();
let url = self.url.clone();
// creating a new task
tokio::spawn(async move {
// looping and sending our response using stream
let sub =
PubsubClient::program_subscribe(&url, &Pubkey::from_str("").unwrap()).unwrap();
loop {
let item = sub.1.recv();
match item {
Ok(v) => {
//
let b = match Bridge::unpack_immutable::<TransferOutProposal>(
v.value.account.data.as_slice(),
) {
Ok(v) => v,
Err(_) => continue,
};
let mut amount_b: [u8; 32] = [0; 32];
b.amount.to_big_endian(&mut amount_b);
let event = if b.vaa_time == 0 {
// The Lockup was created
LockupEvent {
event: Some(Event::New(LockupEventNew {
nonce: b.nonce,
source_chain: CHAIN_ID_SOLANA as u32,
target_chain: b.to_chain_id as u32,
source_address: b.source_address.to_vec(),
target_address: b.foreign_address.to_vec(),
token_chain: b.asset.chain as u32,
token_address: b.asset.address.to_vec(),
amount: amount_b.to_vec(),
})),
}
} else {
// The VAA was submitted
LockupEvent {
event: Some(Event::VaaPosted(LockupEventVaaPosted {
nonce: b.nonce,
source_chain: CHAIN_ID_SOLANA as u32,
target_chain: b.to_chain_id as u32,
source_address: b.source_address.to_vec(),
target_address: b.foreign_address.to_vec(),
token_chain: b.asset.chain as u32,
token_address: b.asset.address.to_vec(),
amount: amount_b.to_vec(),
vaa: b.vaa.to_vec(),
})),
}
};
let mut amount_b: [u8; 32] = [0; 32];
b.amount.to_big_endian(&mut amount_b);
if let Err(_) = tx.send(Ok(event)).await {
return;
};
}
Err(_) => {
tx.send(Err(Status::new(Code::Aborted, "watcher died")))
.await;
return;
}
};
}
});
tokio::spawn(async move {
// We need to keep the channel alive https://github.com/hyperium/tonic/issues/378
loop {
tx1.send(Ok(LockupEvent {
event: Some(Event::Empty(Empty {})),
}))
.await;
sleep(Duration::new(1, 0))
}
});
Ok(Response::new(rx))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse().unwrap();
let agent = AgentImpl {
url: String::from("ws://localhost:8900"),
};
println!("Agent listening on {}", addr);
Server::builder()
.add_service(AgentServer::new(agent))
.serve(addr)
.await?;
Ok(())
}

272
solana/agent/src/monitor.rs Normal file
View File

@ -0,0 +1,272 @@
use std::str::FromStr;
use std::{
marker::PhantomData,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{channel, Receiver},
Arc, RwLock,
},
thread::JoinHandle,
};
use bs58;
use log::*;
use serde::{de::DeserializeOwned, de::Error, Deserialize, Deserializer, Serialize};
use serde_json::{
json,
value::Value::{Number, Object},
Map, Value,
};
use solana_sdk::account::Account;
use solana_sdk::account_info::AccountInfo;
use solana_sdk::pubkey::Pubkey;
use thiserror::Error;
use tungstenite::{client::AutoStream, connect, Message, WebSocket};
use url::{ParseError, Url};
#[derive(Debug, Error)]
pub enum PubsubClientError {
#[error("url parse error")]
UrlParseError(#[from] ParseError),
#[error("unable to connect to server")]
ConnectionError(#[from] tungstenite::Error),
#[error("json parse error")]
JsonParseError(#[from] serde_json::error::Error),
#[error("unexpected message format")]
UnexpectedMessageError,
}
#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
pub struct ProgramUpdate {
#[serde(deserialize_with = "from_bs58")]
pub pubkey: Pubkey,
pub account: ProgramAccount,
}
#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ProgramAccount {
/// lamports in the account
pub lamports: u64,
/// data held in this account
#[serde(deserialize_with = "bytes_from_bs58")]
pub data: Vec<u8>,
/// the program that owns this account. If executable, the program that loads this account.
#[serde(deserialize_with = "from_bs58")]
pub owner: Pubkey,
/// this account's data contains a loaded program (and is now read-only)
pub executable: bool,
/// the epoch at which this account will next owe rent
pub rent_epoch: u64,
}
#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
pub struct ProgramUpdateContext {
pub slot: u64,
}
#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
pub struct ProgramNotificationMessage {
pub value: ProgramUpdate,
pub context: ProgramUpdateContext,
}
pub struct PubsubClientSubscription<T>
where
T: DeserializeOwned,
{
message_type: PhantomData<T>,
operation: &'static str,
socket: Arc<RwLock<WebSocket<AutoStream>>>,
subscription_id: u64,
t_cleanup: Option<JoinHandle<()>>,
exit: Arc<AtomicBool>,
}
impl<T> Drop for PubsubClientSubscription<T>
where
T: DeserializeOwned,
{
fn drop(&mut self) {
self.send_unsubscribe()
.unwrap_or_else(|_| warn!("unable to unsubscribe from websocket"));
self.socket
.write()
.unwrap()
.close(None)
.unwrap_or_else(|_| warn!("unable to close websocket"));
}
}
impl<T> PubsubClientSubscription<T>
where
T: DeserializeOwned,
{
fn send_subscribe(
writable_socket: &Arc<RwLock<WebSocket<AutoStream>>>,
operation: &str,
program: &Pubkey,
) -> Result<u64, PubsubClientError> {
let method = format!("{}Subscribe", operation);
writable_socket
.write()
.unwrap()
.write_message(Message::Text(
json!({
"jsonrpc":"2.0","id":1,"method":method,"params":[program.to_string(),{"encoding":"binary"}]
})
.to_string(),
))?;
let message = writable_socket.write().unwrap().read_message()?;
Self::extract_subscription_id(message)
}
fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
let message_text = &message.into_text()?;
let json_msg: Map<String, Value> = serde_json::from_str(message_text)?;
if let Some(Number(x)) = json_msg.get("result") {
if let Some(x) = x.as_u64() {
return Ok(x);
}
}
Err(PubsubClientError::UnexpectedMessageError)
}
pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
let method = format!("{}Unubscribe", self.operation);
self.socket
.write()
.unwrap()
.write_message(Message::Text(
json!({
"jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id]
})
.to_string(),
))
.map_err(|err| err.into())
}
fn read_message(
writable_socket: &Arc<RwLock<WebSocket<AutoStream>>>,
) -> Result<T, PubsubClientError> {
let message = writable_socket.write().unwrap().read_message()?;
let message_text = &message.into_text().unwrap();
let json_msg: Map<String, Value> = serde_json::from_str(message_text)?;
if let Some(Object(value_1)) = json_msg.get("params") {
if let Some(value_2) = value_1.get("result") {
let x: T = serde_json::from_value::<T>(value_2.clone()).unwrap();
return Ok(x);
}
}
Err(PubsubClientError::UnexpectedMessageError)
}
pub fn shutdown(&mut self) -> std::thread::Result<()> {
if self.t_cleanup.is_some() {
info!("websocket thread - shutting down");
self.exit.store(true, Ordering::Relaxed);
let x = self.t_cleanup.take().unwrap().join();
info!("websocket thread - shut down.");
x
} else {
warn!("websocket thread - already shut down.");
Ok(())
}
}
}
const SLOT_OPERATION: &str = "program";
pub struct PubsubClient {}
impl PubsubClient {
pub fn program_subscribe(
url: &str,
program: &Pubkey,
) -> Result<
(
PubsubClientSubscription<ProgramNotificationMessage>,
Receiver<ProgramNotificationMessage>,
),
PubsubClientError,
> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let (sender, receiver) = channel::<ProgramNotificationMessage>();
let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
let subscription_id =
PubsubClientSubscription::<ProgramNotificationMessage>::send_subscribe(
&socket_clone,
SLOT_OPERATION,
program,
)
.unwrap();
let t_cleanup = std::thread::spawn(move || {
loop {
if exit_clone.load(Ordering::Relaxed) {
break;
}
let message: Result<ProgramNotificationMessage, PubsubClientError> =
PubsubClientSubscription::read_message(&socket_clone);
if let Ok(msg) = message {
match sender.send(msg.clone()) {
Ok(_) => (),
Err(err) => {
info!("receive error: {:?}", err);
break;
}
}
} else {
info!("receive error: {:?}", message);
break;
}
}
info!("websocket - exited receive loop");
});
let result: PubsubClientSubscription<ProgramNotificationMessage> =
PubsubClientSubscription {
message_type: PhantomData,
operation: SLOT_OPERATION,
socket,
subscription_id,
t_cleanup: Some(t_cleanup),
exit,
};
Ok((result, receiver))
}
}
fn from_bs58<'de, D>(deserializer: D) -> Result<Pubkey, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
Pubkey::from_str(s.as_str()).map_err(D::Error::custom)
}
fn bytes_from_bs58<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
bs58::decode(s.as_str())
.into_vec()
.map_err(D::Error::custom)
}

2340
solana/bridge/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -203,6 +203,8 @@ impl Bridge {
// Initialize transfer
transfer.is_initialized = true;
transfer.nonce = t.nonce;
transfer.source_address = sender_account_info.key.to_bytes();
transfer.foreign_address = t.target;
transfer.amount = t.amount;
transfer.to_chain_id = t.chain_id;

View File

@ -52,10 +52,14 @@ pub struct TransferOutProposal {
pub amount: U256,
/// chain id to transfer to
pub to_chain_id: u8,
/// address the transfer was initiated from
pub source_address: ForeignAddress,
/// address on the foreign chain to transfer to
pub foreign_address: ForeignAddress,
/// asset that is being transferred
pub asset: AssetMeta,
/// nonce of the transfer
pub nonce: u32,
/// vaa to unlock the tokens on the foreign chain
pub vaa: VAAData,
/// time the vaa was submitted
@ -210,6 +214,7 @@ impl Bridge {
}
Ok(mut_ref)
}
/// Unpacks a state from a bytes buffer without checking that the state is initialized.
pub fn unpack_unchecked<T: IsInitialized>(input: &mut [u8]) -> Result<&mut T, ProgramError> {
if input.len() != size_of::<T>() {
@ -219,6 +224,15 @@ impl Bridge {
Ok(unsafe { &mut *(&mut input[0] as *mut u8 as *mut T) })
}
/// Unpacks a state from a bytes buffer while assuring that the state is initialized.
pub fn unpack_immutable<T: IsInitialized>(input: &[u8]) -> Result<&T, ProgramError> {
let mut_ref: &T = Self::unpack_unchecked_immutable(input)?;
if !mut_ref.is_initialized() {
return Err(Error::UninitializedState.into());
}
Ok(mut_ref)
}
/// Unpacks a state from a bytes buffer without checking that the state is initialized.
pub fn unpack_unchecked_immutable<T: IsInitialized>(input: &[u8]) -> Result<&T, ProgramError> {
if input.len() != size_of::<T>() {

3230
solana/cli/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -14,7 +14,16 @@ solana-logger = { version = "1.3.1" }
solana-sdk = { version = "1.3.1" }
solana-client = { version = "1.3.1" }
solana-faucet = "1.3.1"
solana-transaction-status = "1.3.1"
spl-token = { package = "spl-token", git = "https://github.com/solana-labs/solana-program-library" }
wormhole-bridge = { path = "../bridge" }
primitive-types = {version ="0.7.2", default-features = false}
primitive-types = {version ="0.7.2"}
hex = "0.4.2"
thiserror = "1.0.20"
tungstenite = "0.11.1"
serde = "1.0.103"
url="2.1.1"
serde_bytes ="0.11.5"
log ="0.4.8"
serde_derive = "1.0.103"
serde_json = "1.0.57"

View File

@ -6,7 +6,10 @@ use clap::{
crate_description, crate_name, crate_version, value_t, value_t_or_exit, App, AppSettings, Arg,
SubCommand,
};
use hex;
use primitive_types::U256;
use solana_clap_utils::input_parsers::value_of;
use solana_clap_utils::input_validators::is_derivation;
use solana_clap_utils::{
input_parsers::{keypair_of, pubkey_of},
input_validators::{is_amount, is_keypair, is_pubkey_or_keypair, is_url},
@ -28,10 +31,6 @@ use spl_token::{
state::{Account, Mint},
};
use hex;
use solana_clap_utils::input_parsers::value_of;
use solana_clap_utils::input_validators::is_derivation;
use spl_bridge::instruction::*;
use spl_bridge::state::*;
use spl_bridge::syscalls::RawKey;