Merge branch 'master' of github.com:blockworks-foundation/solana-accountsdb-connector
This commit is contained in:
commit
cb95574b8a
|
@ -0,0 +1 @@
|
||||||
|
target
|
|
@ -0,0 +1,56 @@
|
||||||
|
name: Publish Docker Image to GCR
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches: [main, master]
|
||||||
|
workflow_call:
|
||||||
|
secrets:
|
||||||
|
GCR_PROJECT:
|
||||||
|
required: false
|
||||||
|
GCR_SA_KEY:
|
||||||
|
required: false
|
||||||
|
|
||||||
|
env:
|
||||||
|
PROJECT_ID: ${{ secrets.GCR_PROJECT }}
|
||||||
|
IMAGE: mango-geyser-services
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
with:
|
||||||
|
submodules: recursive
|
||||||
|
|
||||||
|
- name: Set up Docker Buildx
|
||||||
|
id: buildx
|
||||||
|
uses: docker/setup-buildx-action@master
|
||||||
|
|
||||||
|
# Login to Google Cloud
|
||||||
|
- name: 'Login to Google Cloud'
|
||||||
|
uses: 'google-github-actions/auth@v0'
|
||||||
|
id: auth
|
||||||
|
with:
|
||||||
|
token_format: 'access_token'
|
||||||
|
credentials_json: '${{ secrets.GCR_SA_KEY }}'
|
||||||
|
|
||||||
|
# Login to GCR
|
||||||
|
- name: Login to GCR
|
||||||
|
uses: docker/login-action@v2
|
||||||
|
with:
|
||||||
|
registry: us-docker.pkg.dev
|
||||||
|
username: oauth2accesstoken
|
||||||
|
password: ${{ steps.auth.outputs.access_token }}
|
||||||
|
|
||||||
|
# Build and push the image
|
||||||
|
- name: Build and Push Image
|
||||||
|
uses: docker/build-push-action@v2
|
||||||
|
with:
|
||||||
|
context: .
|
||||||
|
push: true
|
||||||
|
tags: |
|
||||||
|
us-docker.pkg.dev/${{ env.PROJECT_ID }}/gcr.io/${{ env.IMAGE }}:${{ github.sha }}
|
||||||
|
us-docker.pkg.dev/${{ env.PROJECT_ID }}/gcr.io/${{ env.IMAGE }}:latest
|
||||||
|
cache-from: type=gha
|
||||||
|
cache-to: type=gha,mode=max
|
|
@ -0,0 +1,19 @@
|
||||||
|
name: Deploy to Fly
|
||||||
|
|
||||||
|
on: workflow_dispatch
|
||||||
|
|
||||||
|
env:
|
||||||
|
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
deploy:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
|
||||||
|
- name: Setup Fly
|
||||||
|
uses: superfly/flyctl-actions/setup-flyctl@master
|
||||||
|
|
||||||
|
- name: Deploy
|
||||||
|
run: flyctl deploy
|
|
@ -0,0 +1,23 @@
|
||||||
|
# syntax = docker/dockerfile:1.2
|
||||||
|
# Base image containing all binaries, deployed to gcr.io/mango-markets/mango-geyser-services:latest
|
||||||
|
FROM rust:1.59.0 as base
|
||||||
|
RUN cargo install cargo-chef
|
||||||
|
RUN rustup component add rustfmt
|
||||||
|
RUN apt-get update && apt-get install -y clang cmake
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
FROM base AS plan
|
||||||
|
COPY . .
|
||||||
|
RUN cargo chef prepare --recipe-path recipe.json
|
||||||
|
|
||||||
|
FROM base as build
|
||||||
|
COPY --from=plan /app/recipe.json recipe.json
|
||||||
|
RUN cargo chef cook --release --recipe-path recipe.json
|
||||||
|
COPY . .
|
||||||
|
RUN cargo build --release --bin service-mango-fills --bin service-mango-pnl
|
||||||
|
|
||||||
|
FROM debian:bullseye-slim as run
|
||||||
|
RUN apt-get update && apt-get -y install ca-certificates libc6
|
||||||
|
COPY --from=build /app/target/release/service-mango-* /usr/local/bin/
|
||||||
|
COPY --from=build /app/service-mango-pnl/template-config.toml ./pnl-config.toml
|
||||||
|
COPY --from=build /app/service-mango-fills/template-config.toml ./fills-config.toml
|
|
@ -0,0 +1,37 @@
|
||||||
|
app = "mango-geyser-services"
|
||||||
|
|
||||||
|
kill_signal = "SIGINT"
|
||||||
|
kill_timeout = 5
|
||||||
|
|
||||||
|
[build]
|
||||||
|
image = "us-docker.pkg.dev/mango-markets/gcr.io/mango-geyser-services:latest"
|
||||||
|
|
||||||
|
[processes]
|
||||||
|
fills = "service-mango-fills fills-config.toml"
|
||||||
|
pnl = "service-mango-pnl pnl-config.toml"
|
||||||
|
|
||||||
|
[[services]]
|
||||||
|
processes = ["fills"]
|
||||||
|
internal_port = 8080
|
||||||
|
protocol = "tcp"
|
||||||
|
|
||||||
|
[[services.ports]]
|
||||||
|
port = "8080"
|
||||||
|
|
||||||
|
[services.concurrency]
|
||||||
|
type = "connections"
|
||||||
|
hard_limit = 1024
|
||||||
|
soft_limit = 1024
|
||||||
|
|
||||||
|
[[services]]
|
||||||
|
processes = ["pnl"]
|
||||||
|
internal_port = 2052
|
||||||
|
protocol = "tcp"
|
||||||
|
|
||||||
|
[[services.ports]]
|
||||||
|
port = "2052"
|
||||||
|
|
||||||
|
[services.concurrency]
|
||||||
|
type = "connections"
|
||||||
|
hard_limit = 1024
|
||||||
|
soft_limit = 1024
|
|
@ -11,7 +11,7 @@ use futures::{future, future::FutureExt};
|
||||||
use tonic::transport::{Certificate, ClientTlsConfig, Endpoint, Identity};
|
use tonic::transport::{Certificate, ClientTlsConfig, Endpoint, Identity};
|
||||||
|
|
||||||
use log::*;
|
use log::*;
|
||||||
use std::{collections::HashMap, str::FromStr, time::Duration};
|
use std::{collections::HashMap, env, str::FromStr, time::Duration};
|
||||||
|
|
||||||
pub mod geyser_proto {
|
pub mod geyser_proto {
|
||||||
tonic::include_proto!("accountsdb");
|
tonic::include_proto!("accountsdb");
|
||||||
|
@ -71,8 +71,17 @@ async fn feed_data_geyser(
|
||||||
sender: async_channel::Sender<Message>,
|
sender: async_channel::Sender<Message>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let program_id = Pubkey::from_str(&snapshot_config.program_id)?;
|
let program_id = Pubkey::from_str(&snapshot_config.program_id)?;
|
||||||
|
let connection_string = match &grpc_config.connection_string.chars().next().unwrap() {
|
||||||
let endpoint = Endpoint::from_str(&grpc_config.connection_string)?;
|
'$' => env::var(&grpc_config.connection_string[1..])
|
||||||
|
.expect("reading connection string from env"),
|
||||||
|
_ => grpc_config.connection_string.clone(),
|
||||||
|
};
|
||||||
|
let rpc_http_url = match &snapshot_config.rpc_http_url.chars().next().unwrap() {
|
||||||
|
'$' => env::var(&snapshot_config.rpc_http_url[1..])
|
||||||
|
.expect("reading connection string from env"),
|
||||||
|
_ => snapshot_config.rpc_http_url.clone(),
|
||||||
|
};
|
||||||
|
let endpoint = Endpoint::from_str(&connection_string)?;
|
||||||
let channel = if let Some(tls) = tls_config {
|
let channel = if let Some(tls) = tls_config {
|
||||||
endpoint.tls_config(tls)?
|
endpoint.tls_config(tls)?
|
||||||
} else {
|
} else {
|
||||||
|
@ -162,7 +171,7 @@ async fn feed_data_geyser(
|
||||||
}
|
}
|
||||||
if snapshot_needed && max_rooted_slot - rooted_to_finalized_slots > first_full_slot {
|
if snapshot_needed && max_rooted_slot - rooted_to_finalized_slots > first_full_slot {
|
||||||
snapshot_needed = false;
|
snapshot_needed = false;
|
||||||
snapshot_future = tokio::spawn(get_snapshot(snapshot_config.rpc_http_url.clone(), program_id)).fuse();
|
snapshot_future = tokio::spawn(get_snapshot(rpc_http_url.clone(), program_id)).fuse();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -233,16 +242,34 @@ async fn feed_data_geyser(
|
||||||
}
|
}
|
||||||
|
|
||||||
fn make_tls_config(config: &TlsConfig) -> ClientTlsConfig {
|
fn make_tls_config(config: &TlsConfig) -> ClientTlsConfig {
|
||||||
let server_root_ca_cert =
|
let server_root_ca_cert = match &config.ca_cert_path.chars().next().unwrap() {
|
||||||
std::fs::read(&config.ca_cert_path).expect("reading server root ca cert");
|
'$' => env::var(&config.ca_cert_path[1..])
|
||||||
|
.expect("reading server root ca cert from env")
|
||||||
|
.into_bytes(),
|
||||||
|
_ => std::fs::read(&config.ca_cert_path).expect("reading server root ca cert from file"),
|
||||||
|
};
|
||||||
let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
|
let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
|
||||||
let client_cert = std::fs::read(&config.client_cert_path).expect("reading client cert");
|
let client_cert = match &config.client_cert_path.chars().next().unwrap() {
|
||||||
let client_key = std::fs::read(&config.client_key_path).expect("reading client key");
|
'$' => env::var(&config.client_cert_path[1..])
|
||||||
|
.expect("reading client cert from env")
|
||||||
|
.into_bytes(),
|
||||||
|
_ => std::fs::read(&config.client_cert_path).expect("reading client cert from file"),
|
||||||
|
};
|
||||||
|
let client_key = match &config.client_key_path.chars().next().unwrap() {
|
||||||
|
'$' => env::var(&config.client_key_path[1..])
|
||||||
|
.expect("reading client key from env")
|
||||||
|
.into_bytes(),
|
||||||
|
_ => std::fs::read(&config.client_key_path).expect("reading client key from file"),
|
||||||
|
};
|
||||||
let client_identity = Identity::from_pem(client_cert, client_key);
|
let client_identity = Identity::from_pem(client_cert, client_key);
|
||||||
|
let domain_name = match &config.domain_name.chars().next().unwrap() {
|
||||||
|
'$' => env::var(&config.domain_name[1..]).expect("reading domain name from env"),
|
||||||
|
_ => config.domain_name.clone(),
|
||||||
|
};
|
||||||
ClientTlsConfig::new()
|
ClientTlsConfig::new()
|
||||||
.ca_certificate(server_root_ca_cert)
|
.ca_certificate(server_root_ca_cert)
|
||||||
.identity(client_identity)
|
.identity(client_identity)
|
||||||
.domain_name(&config.domain_name)
|
.domain_name(domain_name)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn process_events(
|
pub async fn process_events(
|
||||||
|
|
|
@ -6,7 +6,7 @@ use tokio::{
|
||||||
net::{TcpListener, TcpStream},
|
net::{TcpListener, TcpStream},
|
||||||
pin,
|
pin,
|
||||||
};
|
};
|
||||||
use tokio_tungstenite::tungstenite::protocol::Message;
|
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
|
||||||
|
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use solana_geyser_connector_lib::{
|
use solana_geyser_connector_lib::{
|
||||||
|
@ -17,16 +17,27 @@ use solana_geyser_connector_lib::{
|
||||||
type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
|
type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
|
||||||
type PeerMap = Arc<Mutex<HashMap<SocketAddr, UnboundedSender<Message>>>>;
|
type PeerMap = Arc<Mutex<HashMap<SocketAddr, UnboundedSender<Message>>>>;
|
||||||
|
|
||||||
async fn handle_connection(
|
async fn handle_connection_error(
|
||||||
checkpoint_map: CheckpointMap,
|
checkpoint_map: CheckpointMap,
|
||||||
peer_map: PeerMap,
|
peer_map: PeerMap,
|
||||||
raw_stream: TcpStream,
|
raw_stream: TcpStream,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
) {
|
) {
|
||||||
|
let result = handle_connection(checkpoint_map, peer_map.clone(), raw_stream, addr).await;
|
||||||
|
if result.is_err() {
|
||||||
|
error!("connection {} error {}", addr, result.unwrap_err());
|
||||||
|
};
|
||||||
|
peer_map.lock().unwrap().remove(&addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_connection(
|
||||||
|
checkpoint_map: CheckpointMap,
|
||||||
|
peer_map: PeerMap,
|
||||||
|
raw_stream: TcpStream,
|
||||||
|
addr: SocketAddr,
|
||||||
|
) -> Result<(), Error> {
|
||||||
info!("ws connected: {}", addr);
|
info!("ws connected: {}", addr);
|
||||||
let ws_stream = tokio_tungstenite::accept_async(raw_stream)
|
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
|
||||||
.await
|
|
||||||
.expect("Error during the ws handshake occurred");
|
|
||||||
let (mut ws_tx, _ws_rx) = ws_stream.split();
|
let (mut ws_tx, _ws_rx) = ws_stream.split();
|
||||||
|
|
||||||
// 1: publish channel in peer map
|
// 1: publish channel in peer map
|
||||||
|
@ -42,22 +53,19 @@ async fn handle_connection(
|
||||||
for (_, ckpt) in checkpoint_map_copy.iter() {
|
for (_, ckpt) in checkpoint_map_copy.iter() {
|
||||||
ws_tx
|
ws_tx
|
||||||
.feed(Message::Text(serde_json::to_string(ckpt).unwrap()))
|
.feed(Message::Text(serde_json::to_string(ckpt).unwrap()))
|
||||||
.await
|
.await?;
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let result_ckpt = ws_tx.flush().await;
|
info!("ws ckpt sent: {}", addr);
|
||||||
info!("ws ckpt sent: {} err: {:?}", addr, result_ckpt);
|
ws_tx.flush().await?;
|
||||||
|
|
||||||
// 3: forward all events from channel to peer socket
|
// 3: forward all events from channel to peer socket
|
||||||
let forward_updates = chan_rx.map(Ok).forward(ws_tx);
|
let forward_updates = chan_rx.map(Ok).forward(ws_tx);
|
||||||
pin_mut!(forward_updates);
|
pin_mut!(forward_updates);
|
||||||
let result_forward = forward_updates.await;
|
forward_updates.await?;
|
||||||
|
|
||||||
info!("ws disconnected: {} err: {:?}", &addr, result_forward);
|
info!("ws disconnected: {}", &addr);
|
||||||
peer_map.lock().unwrap().remove(&addr);
|
Ok(())
|
||||||
result_ckpt.unwrap();
|
|
||||||
result_forward.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize)]
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
@ -102,16 +110,18 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let message = fill_receiver.recv().await.unwrap();
|
let message = fill_receiver.recv().await.unwrap();
|
||||||
match message {
|
match message {
|
||||||
FillEventFilterMessage::Update(update) => {
|
FillEventFilterMessage::Update(update) => {
|
||||||
info!("ws update {} {:?} fill", update.market, update.status);
|
debug!("ws update {} {:?} fill", update.market, update.status);
|
||||||
|
|
||||||
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
|
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
|
||||||
|
|
||||||
for (k, v) in peer_copy.iter_mut() {
|
for (k, v) in peer_copy.iter_mut() {
|
||||||
debug!(" > {}", k);
|
trace!(" > {}", k);
|
||||||
|
|
||||||
let json = serde_json::to_string(&update);
|
let json = serde_json::to_string(&update);
|
||||||
|
let result = v.send(Message::Text(json.unwrap())).await;
|
||||||
v.send(Message::Text(json.unwrap())).await.unwrap()
|
if result.is_err() {
|
||||||
|
error!(
|
||||||
|
"ws update {} {:?} fill could not reach {}",
|
||||||
|
update.market, update.status, k
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
FillEventFilterMessage::Checkpoint(checkpoint) => {
|
FillEventFilterMessage::Checkpoint(checkpoint) => {
|
||||||
|
@ -130,7 +140,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// Let's spawn the handling of each connection in a separate task.
|
// Let's spawn the handling of each connection in a separate task.
|
||||||
while let Ok((stream, addr)) = listener.accept().await {
|
while let Ok((stream, addr)) = listener.accept().await {
|
||||||
tokio::spawn(handle_connection(
|
tokio::spawn(handle_connection_error(
|
||||||
checkpoints.clone(),
|
checkpoints.clone(),
|
||||||
peers.clone(),
|
peers.clone(),
|
||||||
stream,
|
stream,
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
bind_ws_addr = "0.0.0.0:8080"
|
||||||
|
|
||||||
|
[source]
|
||||||
|
dedup_queue_size = 50000
|
||||||
|
rpc_ws_url = ""
|
||||||
|
|
||||||
|
[[source.grpc_sources]]
|
||||||
|
name = "accountsdb-client"
|
||||||
|
connection_string = "$GEYSER_CONNECTION_STRING"
|
||||||
|
retry_connection_sleep_secs = 30
|
||||||
|
|
||||||
|
[source.grpc_sources.tls]
|
||||||
|
ca_cert_path = "$GEYSER_CA_CERT"
|
||||||
|
client_cert_path = "$GEYSER_CLIENT_CERT"
|
||||||
|
client_key_path = "$GEYSER_CLIENT_CERT"
|
||||||
|
domain_name = "$GEYSER_CERT_DOMAIN"
|
||||||
|
|
||||||
|
[source.snapshot]
|
||||||
|
rpc_http_url = "$RPC_HTTP_URL"
|
||||||
|
program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68"
|
||||||
|
|
||||||
|
[[markets]]
|
||||||
|
name = "BTC-PERP"
|
||||||
|
event_queue = "7t5Me8RieYKsFpfLEV8jnpqcqswNpyWD95ZqgUXuLV8Z"
|
||||||
|
|
||||||
|
[[markets]]
|
||||||
|
name = "ETH-PERP"
|
||||||
|
event_queue = "9vDfKNPJkCvQv9bzR4JNTGciQC2RVHPVNMMHiVDgT1mw"
|
||||||
|
|
||||||
|
[[markets]]
|
||||||
|
name = "SOL-PERP"
|
||||||
|
event_queue = "31cKs646dt1YkA3zPyxZ7rUAkxTBz279w4XEobFXcAKP"
|
||||||
|
|
||||||
|
[[markets]]
|
||||||
|
name = "MNGO-PERP"
|
||||||
|
event_queue = "7orixrhZpjvofZGWZyyLFxSEt2tfFiost5kHEzd7jdet"
|
||||||
|
|
||||||
|
[[markets]]
|
||||||
|
name = "SRM-PERP"
|
||||||
|
event_queue = "BXSPmdHWP6fMqsCsT6kG8UN9uugAJxdDkQWy87njUQnL"
|
||||||
|
|
||||||
|
[[markets]]
|
||||||
|
name = "RAY-PERP"
|
||||||
|
event_queue = "Css2MQhEvXMTKjp9REVZR9ZyUAYAZAPrnDvRoPxrQkeN"
|
||||||
|
|
||||||
|
[[markets]]
|
||||||
|
name = "FTT-PERP"
|
||||||
|
event_queue = "5pHAhyEphQRVvLqvYF7dziofR52yZWuq8DThQFJvJ7r5"
|
||||||
|
|
||||||
|
[[markets]]
|
||||||
|
name = "ADA-PERP"
|
||||||
|
event_queue = "G6Dsw9KnP4G38hePtedTH6gDfDQmPJGJw8zipBJvKc12"
|
||||||
|
|
||||||
|
[[markets]]
|
||||||
|
name = "BNB-PERP"
|
||||||
|
event_queue = "GmX4qXMpXvs1DuUXNB4eqL1rfF8LeYEjkKgpFeYsm55n"
|
||||||
|
|
||||||
|
[[markets]]
|
||||||
|
name = "AVAX-PERP"
|
||||||
|
event_queue = "5Grgo9kLu692SUcJ6S7jtbi1WkdwiyRWgThAfN1PcvbL"
|
||||||
|
|
||||||
|
[[markets]]
|
||||||
|
name = "GMT-PERP"
|
||||||
|
event_queue = "J2WYiw67VeGkPvmM3fi65H9KxDgCf79fNwspcD3ycubK"
|
|
@ -0,0 +1,27 @@
|
||||||
|
[source]
|
||||||
|
dedup_queue_size = 50000
|
||||||
|
rpc_ws_url = ""
|
||||||
|
|
||||||
|
[[source.grpc_sources]]
|
||||||
|
name = "accountsdb-client"
|
||||||
|
connection_string = "$GEYSER_CONNECTION_STRING"
|
||||||
|
retry_connection_sleep_secs = 30
|
||||||
|
|
||||||
|
[source.grpc_sources.tls]
|
||||||
|
ca_cert_path = "$GEYSER_CA_CERT"
|
||||||
|
client_cert_path = "$GEYSER_CLIENT_CERT"
|
||||||
|
client_key_path = "$GEYSER_CLIENT_CERT"
|
||||||
|
domain_name = "$GEYSER_CERT_DOMAIN"
|
||||||
|
|
||||||
|
[source.snapshot]
|
||||||
|
rpc_http_url = "$RPC_HTTP_URL"
|
||||||
|
program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68"
|
||||||
|
|
||||||
|
[pnl]
|
||||||
|
update_interval_millis = 5000
|
||||||
|
mango_program = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68"
|
||||||
|
mango_group = "98pjRuQjK3qA6gXts96PqZT4Ze5QmnCmt3QYjhbUSPue"
|
||||||
|
mango_cache = "EBDRoayCDDUvDgCimta45ajQeXbexv7aKqJubruqpyvu"
|
||||||
|
|
||||||
|
[jsonrpc_server]
|
||||||
|
bind_address = "0.0.0.0:2052"
|
Loading…
Reference in New Issue