diff --git a/accountsdb-plugin-grpc/src/accountsdb_plugin_grpc.rs b/accountsdb-plugin-grpc/src/accountsdb_plugin_grpc.rs index f9c59d5..a7420d1 100644 --- a/accountsdb-plugin-grpc/src/accountsdb_plugin_grpc.rs +++ b/accountsdb-plugin-grpc/src/accountsdb_plugin_grpc.rs @@ -5,7 +5,6 @@ use { SlotUpdate, SubscribeRequest, Update, }, bs58, - futures_util::FutureExt, log::*, serde_derive::Deserialize, serde_json, @@ -14,7 +13,7 @@ use { Result as PluginResult, SlotStatus, }, std::{fs::File, io::Read}, - tokio::sync::{broadcast, mpsc, oneshot}, + tokio::sync::{broadcast, mpsc}, tonic::transport::Server, }; @@ -83,9 +82,9 @@ pub mod accountsdb_service { } pub struct PluginData { - runtime: tokio::runtime::Runtime, + runtime: Option, server_broadcast: broadcast::Sender, - server_exit_sender: Option>, + server_exit_sender: Option>, accounts_selector: AccountsSelector, } @@ -152,29 +151,35 @@ impl AccountsDbPlugin for Plugin { })?; let service = accountsdb_service::Service::new(config.service_config); - let (server_exit_sender, server_exit_receiver) = oneshot::channel::<()>(); + let (server_exit_sender, mut server_exit_receiver) = broadcast::channel::<()>(1); let server_broadcast = service.sender.clone(); let server = accountsdb_proto::accounts_db_server::AccountsDbServer::new(service); let runtime = tokio::runtime::Runtime::new().unwrap(); - runtime.spawn( - Server::builder() - .add_service(server) - .serve_with_shutdown(addr, server_exit_receiver.map(drop)), - ); + runtime.spawn(Server::builder().add_service(server).serve_with_shutdown( + addr, + async move { + let _ = server_exit_receiver.recv().await; + }, + )); let server_broadcast_c = server_broadcast.clone(); + let mut server_exit_receiver = server_exit_sender.subscribe(); runtime.spawn(async move { loop { // Don't care about the error if there are no receivers. let _ = server_broadcast_c.send(Update { update_oneof: Some(UpdateOneof::Ping(Ping {})), }); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + tokio::select! { + _ = server_exit_receiver.recv() => { break; }, + _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}, + } } }); self.data = Some(PluginData { - runtime, + runtime: Some(runtime), server_broadcast, server_exit_sender: Some(server_exit_sender), accounts_selector, @@ -186,14 +191,17 @@ impl AccountsDbPlugin for Plugin { fn on_unload(&mut self) { info!("Unloading plugin: {:?}", self.name()); - let data = self.data.as_mut().expect("plugin must be initialized"); + let mut data = self.data.take().expect("plugin must be initialized"); data.server_exit_sender .take() .expect("on_unload can only be called once") .send(()) .expect("sending grpc server termination should succeed"); - // TODO: explicitly shut down runtime? + data.runtime + .take() + .expect("must exist") + .shutdown_background(); } fn update_account( diff --git a/connector-lib/src/main.rs b/connector-lib/src/main.rs index 102a9b4..f1d0d41 100644 --- a/connector-lib/src/main.rs +++ b/connector-lib/src/main.rs @@ -120,7 +120,6 @@ impl AccountTable for RawAccountTable { } } -use postgres_types::ToSql; #[tokio::main] async fn main() -> Result<(), anyhow::Error> { let args: Vec = std::env::args().collect(); diff --git a/connector-lib/src/mango.rs b/connector-lib/src/mango.rs index d79a657..c6cd678 100644 --- a/connector-lib/src/mango.rs +++ b/connector-lib/src/mango.rs @@ -2,11 +2,9 @@ use { async_trait::async_trait, bytes::{BufMut, BytesMut}, fixed::types::I80F48, - log::*, mango::state::{DataType, MangoAccount, MangoCache, MangoGroup}, mango_common::Loadable, postgres_types::{IsNull, ToSql, Type}, - solana_sdk::pubkey::Pubkey, std::{cmp, error, mem}, }; diff --git a/connector-lib/src/postgres_target.rs b/connector-lib/src/postgres_target.rs index 651a236..294bac2 100644 --- a/connector-lib/src/postgres_target.rs +++ b/connector-lib/src/postgres_target.rs @@ -3,7 +3,7 @@ use log::*; use postgres_query::{query, query_dyn}; use std::{collections::HashMap, time::Duration}; -use crate::{AccountTable, AccountTables, AccountWrite, SlotUpdate}; +use crate::{AccountTables, AccountWrite, SlotUpdate}; async fn postgres_connection( connection_string: &str, diff --git a/connector-lib/src/websocket_source.rs b/connector-lib/src/websocket_source.rs index d5ec2f8..42c5d9f 100644 --- a/connector-lib/src/websocket_source.rs +++ b/connector-lib/src/websocket_source.rs @@ -31,7 +31,7 @@ async fn feed_data( sender: async_channel::Sender, ) -> Result<(), anyhow::Error> { let program_id = Pubkey::from_str("mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68")?; - let mango_group_address = Pubkey::from_str("98pjRuQjK3qA6gXts96PqZT4Ze5QmnCmt3QYjhbUSPue")?; + //let mango_group_address = Pubkey::from_str("98pjRuQjK3qA6gXts96PqZT4Ze5QmnCmt3QYjhbUSPue")?; let snapshot_duration = Duration::from_secs(300); let connect = ws::try_connect::(&config.rpc_ws_url).map_err_anyhow()?;