Add: subscription + query with warp

This commit is contained in:
Efremov Alexey 2021-09-10 13:08:15 +03:00
parent 36793998a3
commit a9ba061325
6 changed files with 459 additions and 987 deletions

1211
rust/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -6,6 +6,15 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-web = "3"
env_logger = "0.8"
env_logger = "0.9"
futures = "0.3.1"
log = "0.4.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
warp = "0.3"
async-stream = "0.3"
spl-graphql-server = { path = "../program" }
juniper = "0.15"
juniper_graphql_ws = "0.3.0"
juniper_warp = { version = "0.7.0", features = ["subscriptions"] }

View File

@ -1,29 +1,20 @@
use std::io;
use std::sync::Arc;
use std::sync::RwLock;
use std::thread;
use spl_graphql_server::schema::{create_schema, Ctx};
use spl_graphql_server::server::AppServer;
#[actix_web::main]
async fn main() -> io::Result<()> {
#[tokio::main]
async fn main() {
std::env::set_var("RUST_LOG", "actix_web=info");
env_logger::init();
let context = std::sync::Arc::new(RwLock::new(Ctx::new()));
let context = Ctx::new();
let mut ctx = Ctx::clone(&context);
let ctx = Arc::clone(&context);
let server = AppServer::new(create_schema, context);
thread::spawn(move || {
match ctx.try_write() {
Ok(mut c) => {
c.preload();
}
Err(_) => {}
};
ctx.preload();
});
let ctx = Arc::clone(&context);
let schema = std::sync::Arc::new(create_schema());
let server = AppServer::new(schema, ctx);
server.run().await
}

View File

@ -6,13 +6,18 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-web = "3.3.2"
actix-cors = "0.4.0"
warp = "0.3"
env_logger = "0.8"
serde = "1.0.103"
futures = "0.3"
serde_json = "1.0.44"
serde_derive = "1.0.103"
juniper = "0.15"
juniper_graphql_ws = "0.3.0"
juniper_warp = { version = "0.7.0", features = ["subscriptions"] }
async-stream = "0.3"
solana-client = "1.7.8"
solana-program = "1.7.8"
spl-token-vault = { path = "../../token-vault/program", features = [ "no-entrypoint" ] }

View File

@ -1,30 +1,55 @@
use solana_program::pubkey::Pubkey;
use juniper::{FieldResult, FieldError, EmptySubscription, EmptyMutation, RootNode };
use juniper::{FieldResult, FieldError, EmptyMutation, RootNode, graphql_subscription };
use {
crate::state::SharedState
};
use juniper::{GraphQLEnum, GraphQLObject};
use std::str::FromStr;
use futures::Stream;
use std::sync::{RwLock, Arc};
use std::pin::Pin;
pub struct Ctx(SharedState);
pub struct Ctx {
state: Arc<RwLock<SharedState>>
}
impl Ctx {
pub fn new() -> Ctx {
let state = SharedState::new();
Ctx(state)
Ctx {
state: Arc::new(RwLock::new(state))
}
}
pub fn preload(&mut self) {
self.0.preload()
}
pub fn find_vault(&self, key: &str) -> Option<&spl_token_vault::state::Vault> {
match Pubkey::from_str(key) {
Ok(id) => self.0.vaults.get(&id),
Err(_) => Option::None
pub fn clone(ctx: &Ctx) -> Ctx {
let state = Arc::clone(&ctx.state);
Ctx {
state: state
}
}
pub fn preload<'a>(&'a mut self) {
if let Ok(mut state) = self.state.try_write() {
state.preload()
}
}
pub fn find_vault(&self, key: &str) -> Option<Vault> {
let res = self.state
.try_read()
.map(|st| {
match Pubkey::from_str(key) {
Ok(id) => st.vaults.get(&id).map(|v| Vault::from(v)),
Err(_) => None,
}
});
res.unwrap_or(None)
}
pub fn vaults(&self) -> Vec<Vault> {
self.0.vaults.values().map(|v| Vault::from(v)).collect()
match self.state.try_read() {
Ok(state) => state.vaults.values().map(|v| Vault::from(v)).collect(),
Err(_) => Vec::new(),
}
}
}
@ -129,7 +154,7 @@ pub struct QueryRoot;
impl QueryRoot {
/// get vault by id
fn vault(context: &Ctx, id: String) -> FieldResult<Vault> {
let result = context.find_vault(&id).map(|v| Vault::from(v));
let result = context.find_vault(&id);
if let Some(v) = result {
Ok(v)
} else {
@ -143,8 +168,21 @@ impl QueryRoot {
}
}
pub type Schema = RootNode<'static, QueryRoot, EmptyMutation<Ctx>, EmptySubscription<Ctx>>;
type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send>>;
pub struct Subscription;
#[graphql_subscription(context = Ctx)]
impl Subscription {
async fn hello_world() -> StringStream {
let stream = futures::stream::iter(vec![
Ok(String::from("Hello")),
Ok(String::from("World!"))
]);
Box::pin(stream)
}
}
pub type Schema = RootNode<'static, QueryRoot, EmptyMutation<Ctx>, Subscription>;
pub fn create_schema() -> Schema {
Schema::new(QueryRoot {}, EmptyMutation::new(), EmptySubscription::new())
Schema::new(QueryRoot {}, EmptyMutation::new(), Subscription {})
}

View File

@ -1,79 +1,69 @@
use actix_cors::Cors;
use actix_web::{middleware, web, App, HttpResponse, HttpServer};
use juniper::http::graphiql::graphiql_source;
use juniper::http::GraphQLRequest;
use std::{ collections::HashMap, convert::Infallible };
use juniper_warp::subscriptions::serve_graphql_ws;
use juniper_graphql_ws::ConnectionConfig;
use juniper_warp::{playground_filter};
use futures::FutureExt;
use warp::Filter;
use juniper::InputValue;
use std::sync::Arc;
use std::sync::RwLock;
use std::io;
use crate::schema::{Schema, Ctx};
pub struct AppServer {
schema: Arc<Schema>,
context: Arc<RwLock<Ctx>>
}
async fn graphiql() -> HttpResponse {
let html = graphiql_source("http://127.0.0.1:8080/graphql", None);
HttpResponse::Ok()
.content_type("text/html; charset=utf-8")
.body(html)
}
async fn graphql(
st: web::Data<Arc<Schema>>,
ctx: web::Data<Arc<RwLock<Ctx>>>,
data: web::Json<GraphQLRequest>,
) -> Result<HttpResponse, actix_web::Error> {
let user = web::block(move || {
match ctx.get_ref().try_read() {
Ok(context) => {
let res = data.execute_sync(&st, &context);
let json = serde_json::to_string(&res)?;
return Ok::<_, serde_json::error::Error>(json);
}
Err(e) => {
let json_str = format!("{{\"error\":\"{}\"}}", e.to_string());
let json = serde_json::to_string(&json_str)?;
return Ok::<_, serde_json::error::Error>(json);
}
}
}).await?;
Ok(HttpResponse::Ok()
.content_type("application/json")
.body(user))
create_schema: Box<fn() -> Schema>,
context: Ctx
}
impl AppServer {
pub fn new(schema: Arc<Schema>, context: Arc<RwLock<Ctx>>) -> Self {
pub fn new(create_schema: fn() -> Schema, context: Ctx) -> Self {
AppServer {
schema: schema,
context: context
create_schema: Box::new(create_schema),
context: context
}
}
pub async fn run(self) -> io::Result<()> {
let schema = self.schema;
let context = self.context;
// Start http server
HttpServer::new(move || {
App::new()
.data(schema.clone())
.data(context.clone())
.wrap(middleware::Logger::default())
.wrap(
Cors::new()
.allowed_methods(vec!["POST", "GET"])
.supports_credentials()
.max_age(3600)
.finish(),
)
.service(web::resource("/graphql").route(web::post().to(graphql)))
.service(web::resource("/graphiql").route(web::get().to(graphiql)))
pub async fn run(self) {
let qm_schema = (*self.create_schema)();
let base_context = Arc::new(self.context);
let context = Arc::clone(&base_context);
let qm_state = warp::any().map(move || {
return Ctx::clone(&context);
});
let qm_graphql_filter = juniper_warp::make_graphql_filter(qm_schema, qm_state.boxed());
let root_node = Arc::new((*self.create_schema)());
let log = warp::log("warp_subscriptions");
let context = Arc::clone(&base_context);
let routes = (warp::path("subscriptions")
.and(warp::ws())
.map(move |ws: warp::ws::Ws| {
let root_node = Arc::clone(&root_node);
let context = Arc::clone(&context);
let ctx = Ctx::clone(&context);
ws.on_upgrade(move |websocket| async move {
let connection_config = move |_: HashMap<String, InputValue>| async move {
Ok(ConnectionConfig::new(ctx)) as Result<_, Infallible>
};
serve_graphql_ws(websocket, root_node, connection_config)
.map(|r| {
if let Err(e) = r {
println!("Websocket error: {}", e);
}
})
.await
})
}))
.map(|reply| {
// TODO#584: remove this workaround
warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws")
})
.bind("127.0.0.1:8080")?
.run()
.await
}
.or(warp::post()
.and(warp::path("graphql"))
.and(qm_graphql_filter))
.or(warp::get()
.and(warp::path("playground"))
.and(playground_filter("/graphql", Some("/subscriptions"))))
.with(log);
warp::serve(routes).run(([127, 0, 0, 1], 8080)).await;
}
}