Expose weberver in the crnak instead of websockets

This commit is contained in:
Sam Schetterer 2020-09-18 13:52:06 +08:00 committed by Sebastian Conybeare
parent fa2d3da93d
commit aee849f900
3 changed files with 466 additions and 298 deletions

675
crank/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -19,5 +19,6 @@ sloggers = "1.0"
slog-scope = "4.3"
slog-stdlog = "4"
log = "0.4"
tungstenite = {version="0.1", default-features=false}
debug_print = "1.0.0"
tokio = {version = "0.2", features = ["full"]}
warp = "0.2"

View File

@ -36,6 +36,7 @@ use std::mem::size_of;
use std::num::NonZeroU64;
use std::str::FromStr;
use std::{thread, time};
use warp::Filter;
use std::sync::mpsc::{Sender, Receiver};
use sloggers::file::FileLoggerBuilder;
@ -325,17 +326,12 @@ fn main() -> Result<()> {
market,
port,
} => {
let (send, recv) = std::sync::mpsc::channel();
let queue_send = send.clone();
let client = opts.client();
let _ = std::thread::spawn(move || accept_loop(port, send));
let websockets = std::thread::spawn(move || websockets_loop(recv));
let _ = std::thread::spawn(move || read_queue_length_loop(client,
dex_program_id,
market,
queue_send));
// Failures in the others will propagate to this loop via timeout
websockets.join();
let mut runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(read_queue_length_loop(client,
dex_program_id,
market,
port));
}
Command::PrintEventQueue {
ref dex_program_id,
@ -1398,59 +1394,29 @@ enum MonitorEvent {
NewConn(std::net::TcpStream),
}
fn accept_loop(port: u16, mut send: Sender<MonitorEvent>) {
let address = format!("127.0.0.1:{}", port);
let listener = std::net::TcpListener::bind(&address).unwrap();
for stream in listener.incoming() {
send.send(MonitorEvent::NewConn(stream.unwrap())).unwrap();
}
}
fn websockets_loop(mut recv: Receiver<MonitorEvent>) {
let mut websockets: Vec<tungstenite::WebSocket<std::net::TcpStream>> = Vec::new();
let recv_every = time::Duration::from_millis(10000);
while let Ok(value) = recv.recv_timeout(recv_every) {
match value {
MonitorEvent::NumEvents(events) => {
let message = format!("{{ \"events_in_queue\": {} }}", events);
let message = tungstenite::Message::Text(message);
for socket in &mut websockets {
socket.write_message(message.clone()).unwrap();
}
},
MonitorEvent::NewConn(conn) => {
// Tungstenite errors don't implement debug so we can't unwrap?
// Generally we just die here anyways
if let Ok(conn) = tungstenite::accept(conn) {
websockets.push(conn);
} else {
panic!("Couldn't accept websocket stream for unknown reason");
}
}
}
}
}
fn read_queue_length_loop(
async fn read_queue_length_loop(
client: RpcClient,
program_id: Pubkey,
market: Pubkey,
sender: std::sync::mpsc::Sender<MonitorEvent>
port: u16,
) -> Result<()> {
let market_keys = get_keys_for_market(&client, &program_id, &market)?;
loop {
let event_q_data = client
.get_account_with_commitment(&market_keys.event_q, CommitmentConfig::recent())?
.value
.expect("Failed to retrieve account")
.data;
let inner: Cow<[u64]> = remove_dex_account_padding(&event_q_data)?;
let (header, seg0, seg1) = parse_event_queue(&inner)?;
let event_q_len = seg0.len() + seg1.len();
let client = std::sync::Arc::new(client);
let get_data = warp::path("length")
.map(move || {
let client = client.clone();
let market_keys = get_keys_for_market(&client, &program_id, &market).unwrap();
let event_q_data = client
.get_account_with_commitment(&market_keys.event_q, CommitmentConfig::recent()).unwrap()
.value
.expect("Failed to retrieve account")
.data;
let inner: Cow<[u64]> = remove_dex_account_padding(&event_q_data).unwrap();
let (header, seg0, seg1) = parse_event_queue(&inner).unwrap();
let len = seg0.len() + seg1.len();
format!("{{ \"length\": {} }}", len)
});
sender.send(MonitorEvent::NumEvents(event_q_len)).unwrap();
let send_every = time::Duration::from_millis(3000);
thread::sleep(send_every);
}
Ok(warp::serve(get_data)
.run(([127, 0, 0, 1], port))
.await)
}