Fix warnings and plugin shutdown

This commit is contained in:
Christian Kamm 2021-11-07 11:11:37 +01:00
parent 387d34e511
commit 3088e8b071
5 changed files with 24 additions and 19 deletions

View File

@ -5,7 +5,6 @@ use {
SlotUpdate, SubscribeRequest, Update,
},
bs58,
futures_util::FutureExt,
log::*,
serde_derive::Deserialize,
serde_json,
@ -14,7 +13,7 @@ use {
Result as PluginResult, SlotStatus,
},
std::{fs::File, io::Read},
tokio::sync::{broadcast, mpsc, oneshot},
tokio::sync::{broadcast, mpsc},
tonic::transport::Server,
};
@ -83,9 +82,9 @@ pub mod accountsdb_service {
}
pub struct PluginData {
runtime: tokio::runtime::Runtime,
runtime: Option<tokio::runtime::Runtime>,
server_broadcast: broadcast::Sender<Update>,
server_exit_sender: Option<oneshot::Sender<()>>,
server_exit_sender: Option<broadcast::Sender<()>>,
accounts_selector: AccountsSelector,
}
@ -152,29 +151,35 @@ impl AccountsDbPlugin for Plugin {
})?;
let service = accountsdb_service::Service::new(config.service_config);
let (server_exit_sender, server_exit_receiver) = oneshot::channel::<()>();
let (server_exit_sender, mut server_exit_receiver) = broadcast::channel::<()>(1);
let server_broadcast = service.sender.clone();
let server = accountsdb_proto::accounts_db_server::AccountsDbServer::new(service);
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.spawn(
Server::builder()
.add_service(server)
.serve_with_shutdown(addr, server_exit_receiver.map(drop)),
);
runtime.spawn(Server::builder().add_service(server).serve_with_shutdown(
addr,
async move {
let _ = server_exit_receiver.recv().await;
},
));
let server_broadcast_c = server_broadcast.clone();
let mut server_exit_receiver = server_exit_sender.subscribe();
runtime.spawn(async move {
loop {
// Don't care about the error if there are no receivers.
let _ = server_broadcast_c.send(Update {
update_oneof: Some(UpdateOneof::Ping(Ping {})),
});
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
tokio::select! {
_ = server_exit_receiver.recv() => { break; },
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {},
}
}
});
self.data = Some(PluginData {
runtime,
runtime: Some(runtime),
server_broadcast,
server_exit_sender: Some(server_exit_sender),
accounts_selector,
@ -186,14 +191,17 @@ impl AccountsDbPlugin for Plugin {
fn on_unload(&mut self) {
info!("Unloading plugin: {:?}", self.name());
let data = self.data.as_mut().expect("plugin must be initialized");
let mut data = self.data.take().expect("plugin must be initialized");
data.server_exit_sender
.take()
.expect("on_unload can only be called once")
.send(())
.expect("sending grpc server termination should succeed");
// TODO: explicitly shut down runtime?
data.runtime
.take()
.expect("must exist")
.shutdown_background();
}
fn update_account(

View File

@ -120,7 +120,6 @@ impl AccountTable for RawAccountTable {
}
}
use postgres_types::ToSql;
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let args: Vec<String> = std::env::args().collect();

View File

@ -2,11 +2,9 @@ use {
async_trait::async_trait,
bytes::{BufMut, BytesMut},
fixed::types::I80F48,
log::*,
mango::state::{DataType, MangoAccount, MangoCache, MangoGroup},
mango_common::Loadable,
postgres_types::{IsNull, ToSql, Type},
solana_sdk::pubkey::Pubkey,
std::{cmp, error, mem},
};

View File

@ -3,7 +3,7 @@ use log::*;
use postgres_query::{query, query_dyn};
use std::{collections::HashMap, time::Duration};
use crate::{AccountTable, AccountTables, AccountWrite, SlotUpdate};
use crate::{AccountTables, AccountWrite, SlotUpdate};
async fn postgres_connection(
connection_string: &str,

View File

@ -31,7 +31,7 @@ async fn feed_data(
sender: async_channel::Sender<WebsocketMessage>,
) -> Result<(), anyhow::Error> {
let program_id = Pubkey::from_str("mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68")?;
let mango_group_address = Pubkey::from_str("98pjRuQjK3qA6gXts96PqZT4Ze5QmnCmt3QYjhbUSPue")?;
//let mango_group_address = Pubkey::from_str("98pjRuQjK3qA6gXts96PqZT4Ze5QmnCmt3QYjhbUSPue")?;
let snapshot_duration = Duration::from_secs(300);
let connect = ws::try_connect::<RpcSolPubSubClient>(&config.rpc_ws_url).map_err_anyhow()?;