cargo fmt

This commit is contained in:
Christian Kamm 2021-11-01 21:11:19 +01:00
parent 8fe4795733
commit 132f6bcf38
2 changed files with 27 additions and 19 deletions

View File

@ -1,8 +1,8 @@
use { use {
crate::accounts_selector::AccountsSelector, crate::accounts_selector::AccountsSelector,
accountsdb_proto::{ accountsdb_proto::{
slot_update::Status as SlotUpdateStatus, update::UpdateOneof, AccountWrite, SlotUpdate, slot_update::Status as SlotUpdateStatus, update::UpdateOneof, AccountWrite, Ping,
SubscribeRequest, Update, Ping, SlotUpdate, SubscribeRequest, Update,
}, },
bs58, bs58,
futures_util::FutureExt, futures_util::FutureExt,
@ -136,15 +136,14 @@ impl AccountsDbPlugin for Plugin {
let result: serde_json::Value = serde_json::from_str(&contents).unwrap(); let result: serde_json::Value = serde_json::from_str(&contents).unwrap();
let accounts_selector = Self::create_accounts_selector_from_config(&result); let accounts_selector = Self::create_accounts_selector_from_config(&result);
let config: PluginConfig = let config: PluginConfig = serde_json::from_str(&contents).map_err(|err| {
serde_json::from_str(&contents).map_err(|err| { AccountsDbPluginError::ConfigFileReadError {
AccountsDbPluginError::ConfigFileReadError { msg: format!(
msg: format!( "The config file is not in the JSON format expected: {:?}",
"The config file is not in the JSON format expected: {:?}", err
err ),
), }
} })?;
})?;
let addr = config.bind_address.parse().map_err(|err| { let addr = config.bind_address.parse().map_err(|err| {
AccountsDbPluginError::ConfigFileReadError { AccountsDbPluginError::ConfigFileReadError {
@ -168,7 +167,7 @@ impl AccountsDbPlugin for Plugin {
loop { loop {
// Don't care about the error if there are no receivers. // Don't care about the error if there are no receivers.
let _ = server_broadcast_c.send(Update { let _ = server_broadcast_c.send(Update {
update_oneof: Some(UpdateOneof::Ping(Ping{})), update_oneof: Some(UpdateOneof::Ping(Ping {})),
}); });
tokio::time::sleep(std::time::Duration::from_secs(5)).await; tokio::time::sleep(std::time::Duration::from_secs(5)).await;
} }
@ -188,7 +187,9 @@ impl AccountsDbPlugin for Plugin {
info!("Unloading plugin: {:?}", self.name()); info!("Unloading plugin: {:?}", self.name());
let data = self.data.as_mut().expect("plugin must be initialized"); let data = self.data.as_mut().expect("plugin must be initialized");
data.server_exit_sender.take().expect("on_unload can only be called once") data.server_exit_sender
.take()
.expect("on_unload can only be called once")
.send(()) .send(())
.expect("sending grpc server termination should succeed"); .expect("sending grpc server termination should succeed");
@ -204,7 +205,10 @@ impl AccountsDbPlugin for Plugin {
let data = self.data.as_ref().expect("plugin must be initialized"); let data = self.data.as_ref().expect("plugin must be initialized");
match account { match account {
ReplicaAccountInfoVersions::V0_0_1(account) => { ReplicaAccountInfoVersions::V0_0_1(account) => {
if !data.accounts_selector.is_account_selected(account.pubkey, account.owner) { if !data
.accounts_selector
.is_account_selected(account.pubkey, account.owner)
{
return Ok(()); return Ok(());
} }

View File

@ -508,7 +508,8 @@ fn init_postgres(
// Keep only the most recent final write per pubkey // Keep only the most recent final write per pubkey
if newest_final_slot.unwrap_or(-1) < update.slot { if newest_final_slot.unwrap_or(-1) < update.slot {
let query = query!(" \ let query = query!(
" \
DELETE FROM account_write \ DELETE FROM account_write \
USING ( \ USING ( \
SELECT DISTINCT ON(pubkey) pubkey, slot, write_version \ SELECT DISTINCT ON(pubkey) pubkey, slot, write_version \
@ -524,7 +525,7 @@ fn init_postgres(
) \ ) \
)", )",
newest_final_slot = update.slot, newest_final_slot = update.slot,
); );
let result = query.execute(client).await.unwrap(); let result = query.execute(client).await.unwrap();
newest_final_slot = Some(update.slot); newest_final_slot = Some(update.slot);
@ -630,7 +631,10 @@ async fn main() {
let result = out.await; let result = out.await;
assert!(result.is_err()); assert!(result.is_err());
if let Err(err) = result { if let Err(err) = result {
warn!("error during communication with the accountsdb plugin. retrying. {:?}", err); warn!(
"error during communication with the accountsdb plugin. retrying. {:?}",
err
);
} }
tokio::time::sleep(std::time::Duration::from_secs(5)).await; tokio::time::sleep(std::time::Duration::from_secs(5)).await;
} }
@ -678,8 +682,8 @@ async fn main() {
status: status_string.into(), status: status_string.into(),
}) })
.unwrap(); .unwrap();
}, }
accountsdb_proto::update::UpdateOneof::Ping(_) => {}, accountsdb_proto::update::UpdateOneof::Ping(_) => {}
} }
} }