Compare commits
4 Commits
0a09ad81ae
...
1c41e94e28
Author | SHA1 | Date |
---|---|---|
Maximilian Schneider | 1c41e94e28 | |
Maximilian Schneider | 6287f1c344 | |
Maximilian Schneider | 17ba55eac6 | |
Maximilian Schneider | dc74c89626 |
|
@ -38,7 +38,7 @@ pub async fn init(client: Arc<RpcClient>) -> Arc<RwLock<Hash>> {
|
||||||
let join_hdl = {
|
let join_hdl = {
|
||||||
// create a thread-local reference to blockhash
|
// create a thread-local reference to blockhash
|
||||||
let blockhash_c = blockhash.clone();
|
let blockhash_c = blockhash.clone();
|
||||||
spawn(async move { poll_loop(blockhash_c, client) })
|
spawn(async move { poll_loop(blockhash_c, client).await })
|
||||||
};
|
};
|
||||||
|
|
||||||
return blockhash;
|
return blockhash;
|
||||||
|
|
|
@ -3,69 +3,21 @@ mod transaction_builder;
|
||||||
mod transaction_sender;
|
mod transaction_sender;
|
||||||
|
|
||||||
use anchor_client::{
|
use anchor_client::{
|
||||||
solana_sdk::{account::Account, commitment_config::CommitmentConfig, signature::Keypair},
|
solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair},
|
||||||
Cluster,
|
Cluster,
|
||||||
};
|
};
|
||||||
use anchor_lang::prelude::Pubkey;
|
use anchor_lang::prelude::Pubkey;
|
||||||
use bytemuck::cast_slice;
|
use bytemuck::bytes_of;
|
||||||
use client::{Client, MangoGroupContext};
|
use client::{Client, MangoGroupContext};
|
||||||
use futures_channel::mpsc::{unbounded, UnboundedSender};
|
|
||||||
use futures_util::{
|
|
||||||
future::{self, Ready},
|
|
||||||
pin_mut, SinkExt, StreamExt, TryStreamExt,
|
|
||||||
};
|
|
||||||
use log::*;
|
use log::*;
|
||||||
use solana_client::{nonblocking::blockhash_query, nonblocking::rpc_client::RpcClient};
|
use solana_client::nonblocking::rpc_client::RpcClient;
|
||||||
use std::{
|
use std::{collections::HashSet, fs::File, io::Read, str::FromStr, sync::Arc, time::Duration};
|
||||||
collections::{HashMap, HashSet},
|
|
||||||
convert::identity,
|
|
||||||
fs::File,
|
|
||||||
io::Read,
|
|
||||||
net::SocketAddr,
|
|
||||||
str::FromStr,
|
|
||||||
sync::Arc,
|
|
||||||
sync::{atomic::AtomicBool, Mutex},
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
use tokio::{
|
|
||||||
net::{TcpListener, TcpStream},
|
|
||||||
pin, time,
|
|
||||||
};
|
|
||||||
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
|
|
||||||
|
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
use solana_geyser_connector_lib::FilterConfig;
|
||||||
use solana_geyser_connector_lib::{
|
use solana_geyser_connector_lib::{
|
||||||
fill_event_filter::{self, FillCheckpoint},
|
|
||||||
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
|
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
|
||||||
};
|
};
|
||||||
use solana_geyser_connector_lib::{
|
|
||||||
metrics::{MetricType, MetricU64},
|
|
||||||
FilterConfig, StatusResponse,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize)]
|
|
||||||
#[serde(tag = "command")]
|
|
||||||
pub enum Command {
|
|
||||||
#[serde(rename = "subscribe")]
|
|
||||||
Subscribe(SubscribeCommand),
|
|
||||||
#[serde(rename = "unsubscribe")]
|
|
||||||
Unsubscribe(UnsubscribeCommand),
|
|
||||||
#[serde(rename = "getMarkets")]
|
|
||||||
GetMarkets,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct SubscribeCommand {
|
|
||||||
pub market_id: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct UnsubscribeCommand {
|
|
||||||
pub market_id: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize)]
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub source: SourceConfig,
|
pub source: SourceConfig,
|
||||||
|
@ -110,22 +62,17 @@ async fn main() -> anyhow::Result<()> {
|
||||||
Some(rpc_timeout),
|
Some(rpc_timeout),
|
||||||
);
|
);
|
||||||
let group_pk = Pubkey::from_str(&config.mango_group).unwrap();
|
let group_pk = Pubkey::from_str(&config.mango_group).unwrap();
|
||||||
let group_context = Arc::new(
|
let group_context =
|
||||||
MangoGroupContext::new_from_rpc(
|
Arc::new(MangoGroupContext::new_from_rpc(&client.rpc_async(), group_pk).await?);
|
||||||
&client.rpc_async(),
|
|
||||||
group_pk
|
|
||||||
)
|
|
||||||
.await?,
|
|
||||||
);
|
|
||||||
|
|
||||||
let perp_queue_pks: Vec<(Pubkey, Pubkey)> = group_context
|
let perp_queue_pks: Vec<_> = group_context
|
||||||
.perp_markets
|
.perp_markets
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(_, context)| (context.address, context.market.event_queue))
|
.map(|(_, context)| (context.address, context.market.event_queue))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// fetch all serum/openbook markets to find their event queues
|
// fetch all serum/openbook markets to find their event queues
|
||||||
let serum_market_pks: Vec<Pubkey> = group_context
|
let serum_market_pks: Vec<_> = group_context
|
||||||
.serum3_markets
|
.serum3_markets
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(_, context)| context.market.serum_market_external)
|
.map(|(_, context)| context.market.serum_market_external)
|
||||||
|
@ -136,7 +83,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.get_multiple_accounts(serum_market_pks.as_slice())
|
.get_multiple_accounts(serum_market_pks.as_slice())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let serum_market_ais: Vec<&Account> = serum_market_ais
|
let serum_market_ais: Vec<_> = serum_market_ais
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|maybe_ai| match maybe_ai {
|
.filter_map(|maybe_ai| match maybe_ai {
|
||||||
Some(ai) => Some(ai),
|
Some(ai) => Some(ai),
|
||||||
|
@ -144,17 +91,15 @@ async fn main() -> anyhow::Result<()> {
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let serum_queue_pks: Vec<(Pubkey, Pubkey)> = serum_market_ais
|
let serum_queue_pks: Vec<_> = serum_market_ais
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|pair| {
|
.map(|pair| {
|
||||||
let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes(
|
let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes(
|
||||||
&pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()],
|
&pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()],
|
||||||
);
|
);
|
||||||
(
|
let event_q = market_state.event_q;
|
||||||
serum_market_pks[pair.0],
|
(serum_market_pks[pair.0], Pubkey::new(bytes_of(&event_q)))
|
||||||
Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_])),
|
|
||||||
)
|
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
@ -167,6 +112,8 @@ async fn main() -> anyhow::Result<()> {
|
||||||
)
|
)
|
||||||
.expect("init transaction builder");
|
.expect("init transaction builder");
|
||||||
|
|
||||||
|
// TODO: throttle cranking, currently runs very fast
|
||||||
|
// TODO: use real keypair from config / env
|
||||||
transaction_sender::init(instruction_receiver, blockhash, rpc_client, Keypair::new());
|
transaction_sender::init(instruction_receiver, blockhash, rpc_client, Keypair::new());
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
|
@ -179,11 +126,15 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.collect::<String>()
|
.collect::<String>()
|
||||||
);
|
);
|
||||||
let use_geyser = true;
|
let use_geyser = true;
|
||||||
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat();
|
let all_queue_pks: HashSet<Pubkey> = perp_queue_pks
|
||||||
let relevant_pubkeys = all_queue_pks.iter().map(|m| m.1.to_string()).collect();
|
.iter()
|
||||||
|
.chain(serum_queue_pks.iter())
|
||||||
|
.map(|mkt| mkt.1)
|
||||||
|
.collect();
|
||||||
|
|
||||||
let filter_config = FilterConfig {
|
let filter_config = FilterConfig {
|
||||||
program_ids: vec![],
|
program_ids: vec![],
|
||||||
account_ids: relevant_pubkeys,
|
account_ids: all_queue_pks.iter().map(|pk| pk.to_string()).collect(),
|
||||||
};
|
};
|
||||||
if use_geyser {
|
if use_geyser {
|
||||||
grpc_plugin_source::process_events(
|
grpc_plugin_source::process_events(
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use crate::Pubkey;
|
|
||||||
use bytemuck::cast_ref;
|
use bytemuck::cast_ref;
|
||||||
|
use serum_dex::{instruction::MarketInstruction, state::EventView};
|
||||||
use solana_geyser_connector_lib::{
|
use solana_geyser_connector_lib::{
|
||||||
chain_data::{AccountData, ChainData, SlotData},
|
chain_data::{AccountData, ChainData, SlotData},
|
||||||
metrics::Metrics,
|
metrics::Metrics,
|
||||||
|
@ -11,13 +11,15 @@ use anchor_lang::AccountDeserialize;
|
||||||
use log::*;
|
use log::*;
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
account::{ReadableAccount, WritableAccount},
|
account::{ReadableAccount, WritableAccount},
|
||||||
instruction::{Instruction, AccountMeta},
|
instruction::{AccountMeta, Instruction},
|
||||||
|
pubkey::Pubkey,
|
||||||
stake_history::Epoch,
|
stake_history::Epoch,
|
||||||
};
|
};
|
||||||
use std::{
|
use std::{
|
||||||
borrow::BorrowMut,
|
borrow::BorrowMut,
|
||||||
collections::{HashMap, HashSet}, iter::{once, empty},
|
collections::{BTreeSet, HashMap, HashSet},
|
||||||
convert::TryFrom
|
convert::TryFrom,
|
||||||
|
str::FromStr,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub fn init(
|
pub fn init(
|
||||||
|
@ -44,26 +46,20 @@ pub fn init(
|
||||||
let (instruction_sender, instruction_receiver) = async_channel::unbounded::<Vec<Instruction>>();
|
let (instruction_sender, instruction_receiver) = async_channel::unbounded::<Vec<Instruction>>();
|
||||||
|
|
||||||
let mut chain_cache = ChainData::new(metrics_sender);
|
let mut chain_cache = ChainData::new(metrics_sender);
|
||||||
let mut perp_events_cache = HashMap::<
|
|
||||||
String,
|
|
||||||
[mango_v4::state::AnyEvent; mango_v4::state::MAX_NUM_EVENTS as usize],
|
|
||||||
>::new();
|
|
||||||
let mut serum_events_cache = HashMap::<String, Vec<serum_dex::state::Event>>::new();
|
|
||||||
let mut seq_num_cache = HashMap::<String, u64>::new();
|
|
||||||
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
|
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
|
||||||
|
|
||||||
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat();
|
let all_queue_pks: HashSet<Pubkey> = perp_queue_pks
|
||||||
let relevant_pubkeys = all_queue_pks
|
|
||||||
.iter()
|
.iter()
|
||||||
.map(|m| m.1)
|
.chain(serum_queue_pks.iter())
|
||||||
.collect::<HashSet<Pubkey>>();
|
.map(|mkt| mkt.1)
|
||||||
|
.collect();
|
||||||
|
|
||||||
// update handling thread, reads both sloths and account updates
|
// update handling thread, reads both sloths and account updates
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Ok(account_write) = account_write_queue_receiver.recv() => {
|
Ok(account_write) = account_write_queue_receiver.recv() => {
|
||||||
if !relevant_pubkeys.contains(&account_write.pubkey) {
|
if !all_queue_pks.contains(&account_write.pubkey) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,37 +89,36 @@ pub fn init(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for mkt in all_queue_pks.iter() {
|
for (mkt_pk, evq_pk) in perp_queue_pks.iter() {
|
||||||
let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0));
|
let evq_b58 = evq_pk.to_string();
|
||||||
let mkt_pk = mkt.0;
|
let last_evq_version = last_evq_versions.get(&evq_b58).unwrap_or(&(0, 0));
|
||||||
let evq_pk = mkt.1;
|
|
||||||
|
|
||||||
match chain_cache.account(&evq_pk) {
|
match chain_cache.account(&evq_pk) {
|
||||||
Ok(account_info) => {
|
Ok(account_info) => {
|
||||||
// only process if the account state changed
|
// only process if the account state changed
|
||||||
let evq_version = (account_info.slot, account_info.write_version);
|
let evq_version = (account_info.slot, account_info.write_version);
|
||||||
let evq_pk_string = evq_pk.to_string();
|
trace!("mango perp evq={evq_b58} write_version={:?}", evq_version);
|
||||||
trace!("evq {} write_version {:?}", evq_pk_string, evq_version);
|
|
||||||
if evq_version == *last_evq_version {
|
if evq_version == *last_evq_version {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
last_evq_versions.insert(evq_pk_string.clone(), evq_version);
|
last_evq_versions.insert(evq_b58.clone(), evq_version);
|
||||||
|
|
||||||
let account = &account_info.account;
|
let account = &account_info.account;
|
||||||
let is_perp = mango_v4::check_id(account.owner());
|
|
||||||
if is_perp {
|
let event_queue: mango_v4::state::EventQueue =
|
||||||
let event_queue: mango_v4::state::EventQueue = mango_v4::state::EventQueue::try_deserialize(
|
mango_v4::state::EventQueue::try_deserialize(
|
||||||
account.data().borrow_mut(),
|
account.data().borrow_mut(),
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
trace!(
|
|
||||||
"evq {} seq_num {}",
|
|
||||||
evq_pk_string,
|
|
||||||
event_queue.header.seq_num
|
|
||||||
);
|
|
||||||
|
|
||||||
if !event_queue.empty() {
|
if !event_queue.empty() {
|
||||||
let mango_accounts: HashSet<_> = event_queue.iter().take(10).flat_map(|e| match mango_v4::state::EventType::try_from(e.event_type).expect("mango v4 event") {
|
let mango_accounts: BTreeSet<_> = event_queue
|
||||||
|
.iter()
|
||||||
|
.take(10)
|
||||||
|
.flat_map(|e| {
|
||||||
|
match mango_v4::state::EventType::try_from(e.event_type)
|
||||||
|
.expect("mango v4 event")
|
||||||
|
{
|
||||||
mango_v4::state::EventType::Fill => {
|
mango_v4::state::EventType::Fill => {
|
||||||
let fill: &mango_v4::state::FillEvent = cast_ref(e);
|
let fill: &mango_v4::state::FillEvent = cast_ref(e);
|
||||||
vec![fill.maker, fill.taker]
|
vec![fill.maker, fill.taker]
|
||||||
|
@ -132,81 +127,120 @@ pub fn init(
|
||||||
let out: &mango_v4::state::OutEvent = cast_ref(e);
|
let out: &mango_v4::state::OutEvent = cast_ref(e);
|
||||||
vec![out.owner]
|
vec![out.owner]
|
||||||
}
|
}
|
||||||
mango_v4::state::EventType::Liquidate => vec![]
|
mango_v4::state::EventType::Liquidate => vec![],
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let mut ams: Vec<_> = anchor_lang::ToAccountMetas::to_account_metas(
|
let mut ams: Vec<_> = anchor_lang::ToAccountMetas::to_account_metas(
|
||||||
&mango_v4::accounts::PerpConsumeEvents {
|
&mango_v4::accounts::PerpConsumeEvents {
|
||||||
group: group_pk,
|
group: group_pk,
|
||||||
perp_market: mkt_pk,
|
perp_market: *mkt_pk,
|
||||||
event_queue: evq_pk,
|
event_queue: *evq_pk,
|
||||||
},
|
},
|
||||||
None,
|
None,
|
||||||
);
|
);
|
||||||
|
ams.append(
|
||||||
ams.append(&mut mango_accounts
|
&mut mango_accounts
|
||||||
.iter()
|
.iter()
|
||||||
.map(|pk| AccountMeta { pubkey: *pk, is_signer: false, is_writable: true })
|
.map(|pk| AccountMeta::new(*pk, false))
|
||||||
.collect());
|
.collect(),
|
||||||
|
);
|
||||||
|
|
||||||
let ix = Instruction {
|
let ix = Instruction {
|
||||||
program_id: mango_v4::id(),
|
program_id: mango_v4::id(),
|
||||||
accounts: ams,
|
accounts: ams,
|
||||||
data: anchor_lang::InstructionData::data(&mango_v4::instruction::PerpConsumeEvents {
|
data: anchor_lang::InstructionData::data(
|
||||||
limit: 10,
|
&mango_v4::instruction::PerpConsumeEvents { limit: 10 },
|
||||||
}),
|
),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"mango perp evq={evq_b58} count={} limit=10",
|
||||||
|
event_queue.iter().count()
|
||||||
|
);
|
||||||
instruction_sender.send(vec![ix]).await;
|
instruction_sender.send(vec![ix]).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
match seq_num_cache.get(&evq_pk_string) {
|
|
||||||
Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) {
|
|
||||||
Some(old_events) => {}
|
|
||||||
_ => {
|
|
||||||
info!("perp_events_cache could not find {}", evq_pk_string)
|
|
||||||
}
|
}
|
||||||
},
|
Err(_) => info!("chain_cache could not find {evq_b58}"),
|
||||||
_ => info!("seq_num_cache could not find {}", evq_pk_string),
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
seq_num_cache
|
for (mkt_pk, evq_pk) in serum_queue_pks.iter() {
|
||||||
.insert(evq_pk_string.clone(), event_queue.header.seq_num.clone());
|
let evq_b58 = evq_pk.to_string();
|
||||||
perp_events_cache
|
let last_evq_version = last_evq_versions.get(&evq_b58).unwrap_or(&(0, 0));
|
||||||
.insert(evq_pk_string.clone(), event_queue.buf.clone());
|
|
||||||
} else {
|
match chain_cache.account(&evq_pk) {
|
||||||
|
Ok(account_info) => {
|
||||||
|
// only process if the account state changed
|
||||||
|
let evq_version = (account_info.slot, account_info.write_version);
|
||||||
|
trace!("serum evq={evq_b58} write_version={:?}", evq_version);
|
||||||
|
if evq_version == *last_evq_version {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
last_evq_versions.insert(evq_b58.clone(), evq_version);
|
||||||
|
|
||||||
|
let account = &account_info.account;
|
||||||
|
|
||||||
let inner_data = &account.data()[5..&account.data().len() - 7];
|
let inner_data = &account.data()[5..&account.data().len() - 7];
|
||||||
let header_span = std::mem::size_of::<SerumEventQueueHeader>();
|
let header_span = std::mem::size_of::<SerumEventQueueHeader>();
|
||||||
let header: SerumEventQueueHeader =
|
let header: SerumEventQueueHeader =
|
||||||
*bytemuck::from_bytes(&inner_data[..header_span]);
|
*bytemuck::from_bytes(&inner_data[..header_span]);
|
||||||
let seq_num = header.seq_num;
|
|
||||||
let count = header.count;
|
let count = header.count;
|
||||||
|
|
||||||
|
if count > 0 {
|
||||||
let rest = &inner_data[header_span..];
|
let rest = &inner_data[header_span..];
|
||||||
let slop = rest.len() % std::mem::size_of::<serum_dex::state::Event>();
|
let event_size = std::mem::size_of::<serum_dex::state::Event>();
|
||||||
let new_len = rest.len() - slop;
|
let slop = rest.len() % event_size;
|
||||||
let events = &rest[..new_len];
|
let end = rest.len() - slop;
|
||||||
debug!("evq {} header_span {} header_seq_num {} header_count {} inner_len {} events_len {} sizeof Event {}", evq_pk_string, header_span, seq_num, count, inner_data.len(), events.len(), std::mem::size_of::<serum_dex::state::Event>());
|
let events =
|
||||||
let events: &[serum_dex::state::Event] = bytemuck::cast_slice(&events);
|
bytemuck::cast_slice::<u8, serum_dex::state::Event>(&rest[..end]);
|
||||||
|
let seq_num = header.seq_num;
|
||||||
|
|
||||||
match seq_num_cache.get(&evq_pk_string) {
|
let oo_pks: BTreeSet<_> = (0..count)
|
||||||
Some(old_seq_num) => match serum_events_cache.get(&evq_pk_string) {
|
.map(|i| {
|
||||||
Some(old_events) => {}
|
let offset = (seq_num - count + i) % events.len() as u64;
|
||||||
_ => {
|
let event: serum_dex::state::Event = events[offset as usize];
|
||||||
info!("serum_events_cache could not find {}", evq_pk_string)
|
let oo_pk = match event.as_view().unwrap() {
|
||||||
}
|
EventView::Fill { owner, .. }
|
||||||
},
|
| EventView::Out { owner, .. } => {
|
||||||
_ => info!("seq_num_cache could not find {}", evq_pk_string),
|
bytemuck::cast_slice::<u64, Pubkey>(&owner)[0]
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
oo_pk
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
|
let mut ams: Vec<_> = oo_pks
|
||||||
serum_events_cache
|
.iter()
|
||||||
.insert(evq_pk_string.clone(), events.clone().to_vec());
|
.map(|pk| AccountMeta::new(*pk, false))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// pass two times evq_pk instead of deprecated fee receivers to reduce encoded tx size
|
||||||
|
ams.append(
|
||||||
|
&mut [
|
||||||
|
mkt_pk, evq_pk, evq_pk, /*coin_pk*/
|
||||||
|
evq_pk, /*pc_pk*/
|
||||||
|
]
|
||||||
|
.iter()
|
||||||
|
.map(|pk| AccountMeta::new(**pk, false))
|
||||||
|
.collect(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let ix = Instruction {
|
||||||
|
program_id: Pubkey::from_str(
|
||||||
|
"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX",
|
||||||
|
)
|
||||||
|
.unwrap(),
|
||||||
|
accounts: ams,
|
||||||
|
data: MarketInstruction::ConsumeEvents(count as u16).pack(),
|
||||||
|
};
|
||||||
|
|
||||||
|
info!("serum evq={evq_b58} count={count}");
|
||||||
|
instruction_sender.send(vec![ix]).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => info!("chain_cache could not find {}", mkt.1),
|
Err(_) => info!("chain_cache could not find {evq_b58}"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use log::*;
|
||||||
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcSendTransactionConfig};
|
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcSendTransactionConfig};
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
hash::Hash, instruction::Instruction, signature::Keypair, signature::Signer,
|
hash::Hash, instruction::Instruction, signature::Keypair, signature::Signer,
|
||||||
|
@ -12,12 +13,14 @@ pub async fn send_loop(
|
||||||
client: Arc<RpcClient>,
|
client: Arc<RpcClient>,
|
||||||
keypair: Keypair,
|
keypair: Keypair,
|
||||||
) {
|
) {
|
||||||
|
info!("signing with keypair pk={:?}", keypair.pubkey());
|
||||||
let cfg = RpcSendTransactionConfig {
|
let cfg = RpcSendTransactionConfig {
|
||||||
skip_preflight: true,
|
skip_preflight: true,
|
||||||
..RpcSendTransactionConfig::default()
|
..RpcSendTransactionConfig::default()
|
||||||
};
|
};
|
||||||
loop {
|
loop {
|
||||||
if let Ok(ixs) = ixs_rx.recv().await {
|
if let Ok(ixs) = ixs_rx.recv().await {
|
||||||
|
// TODO add priority fee
|
||||||
let tx = Transaction::new_signed_with_payer(
|
let tx = Transaction::new_signed_with_payer(
|
||||||
&ixs,
|
&ixs,
|
||||||
Some(&keypair.pubkey()),
|
Some(&keypair.pubkey()),
|
||||||
|
@ -25,6 +28,7 @@ pub async fn send_loop(
|
||||||
*blockhash.read().unwrap(),
|
*blockhash.read().unwrap(),
|
||||||
);
|
);
|
||||||
// TODO: collect metrics
|
// TODO: collect metrics
|
||||||
|
info!("send tx={:?}", tx.signatures[0]);
|
||||||
client.send_transaction_with_config(&tx, cfg).await;
|
client.send_transaction_with_config(&tx, cfg).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,6 +40,5 @@ pub fn init(
|
||||||
client: Arc<RpcClient>,
|
client: Arc<RpcClient>,
|
||||||
keypair: Keypair,
|
keypair: Keypair,
|
||||||
) {
|
) {
|
||||||
// launch task
|
spawn(async move { send_loop(ixs_rx, blockhash, client, keypair).await });
|
||||||
spawn(async move { send_loop(ixs_rx, blockhash, client, keypair) });
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue