Tidy up repo, upgrade solana/mango

This commit is contained in:
Riordan Panayides 2022-12-16 10:58:04 +00:00
parent 21f614b286
commit c00935bae7
34 changed files with 9 additions and 2254 deletions

View File

@ -1,9 +1,6 @@
[workspace]
members = [
"geyser-plugin-grpc",
"lib",
"connector-raw",
# "connector-mango",
"service-mango-fills",
"service-mango-pnl",
]
@ -12,7 +9,8 @@ members = [
[patch.crates-io]
# for gzip encoded responses
jsonrpc-core-client = { git = "https://github.com/ckamm/jsonrpc.git", branch = "ckamm/http-with-gzip" }
# force usage of mango-v4 anchor submodule with 1.10.35 support
# force usage of mango-v4 submodules with 1.14.9 support
anchor-spl = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
anchor-lang = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
anchor-client = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
switchboard-v2 = { git = "ssh://git@github.com/blockworks-foundation/sbv2-solana", branch = "mango-v4" }

View File

@ -1,20 +0,0 @@
#!/usr/bin/env bash
# Source:
# https://github.com/solana-labs/solana-accountsdb-plugin-postgres/blob/master/ci/cargo-build-test.sh
set -e
cd "$(dirname "$0")/.."
source ./ci/rust-version.sh stable
export RUSTFLAGS="-D warnings"
export RUSTBACKTRACE=1
set -x
# Build/test all host crates
cargo +"$rust_stable" build
cargo +"$rust_stable" test -- --nocapture
exit 0

View File

@ -1,78 +0,0 @@
#!/usr/bin/env bash
set -e
usage() {
exitcode=0
if [[ -n "$1" ]]; then
exitcode=1
echo "Error: $*"
fi
cat <<EOF
usage: $0 [+<cargo version>] [--debug] <install directory>
EOF
exit $exitcode
}
case "$CI_OS_NAME" in
osx)
libExt=dylib
;;
linux)
libExt=so
;;
*)
echo CI_OS_NAME unsupported
exit 1
;;
esac
maybeRustVersion=
installDir=
buildVariant=release
maybeReleaseFlag=--release
while [[ -n $1 ]]; do
if [[ ${1:0:1} = - ]]; then
if [[ $1 = --debug ]]; then
maybeReleaseFlag=
buildVariant=debug
shift
else
usage "Unknown option: $1"
fi
elif [[ ${1:0:1} = \+ ]]; then
maybeRustVersion=$1
shift
else
installDir=$1
shift
fi
done
if [[ -z "$installDir" ]]; then
usage "Install directory not specified"
exit 1
fi
installDir="$(mkdir -p "$installDir"; cd "$installDir"; pwd)"
echo "Install location: $installDir ($buildVariant)"
cd "$(dirname "$0")"/..
SECONDS=0
mkdir -p "$installDir/lib"
(
set -x
# shellcheck disable=SC2086 # Don't want to double quote $rust_version
cargo $maybeRustVersion build $maybeReleaseFlag --lib
)
cp -fv "target/$buildVariant/${GEYSER_PLUGIN_LIB}.$libExt" "$installDir"/lib/
echo "Done after $SECONDS seconds"

View File

@ -1,51 +0,0 @@
#!/usr/bin/env bash
set -e
cd "$(dirname "$0")/.."
case "$CI_OS_NAME" in
osx)
_cputype="$(uname -m)"
if [[ $_cputype = arm64 ]]; then
_cputype=aarch64
fi
TARGET=${_cputype}-apple-darwin
;;
linux)
TARGET=x86_64-unknown-linux-gnu
;;
*)
echo CI_OS_NAME unsupported
exit 1
;;
esac
RELEASE_BASENAME="${RELEASE_BASENAME:="${GEYSER_PLUGIN_NAME}-release"}"
TARBALL_BASENAME="${TARBALL_BASENAME:="$RELEASE_BASENAME"}"
echo --- Creating release tarball
(
set -x
rm -rf "${RELEASE_BASENAME:?}"/
mkdir "${RELEASE_BASENAME}"/
COMMIT="$(git rev-parse HEAD)"
(
echo "channel: $CI_TAG"
echo "commit: $COMMIT"
echo "target: $TARGET"
) > "${RELEASE_BASENAME}"/version.yml
# Make CHANNEL available to include in the software version information
export CHANNEL
source ci/rust-version.sh stable
ci/cargo-install-all.sh stable "${RELEASE_BASENAME}"
tar cvf "${TARBALL_BASENAME}"-$TARGET.tar "${RELEASE_BASENAME}"
bzip2 "${TARBALL_BASENAME}"-$TARGET.tar
cp "${RELEASE_BASENAME}"/version.yml "${TARBALL_BASENAME}"-$TARGET.yml
)
echo --- ok

View File

@ -1,3 +0,0 @@
#!/bin/bash
plugin_name=solana-geyser-sqs-grpc-release
plugin_lib_name=solana_geyser_connector_plugin_grpc

View File

@ -1,137 +0,0 @@
#!/usr/bin/env bash
# Source:
# https://github.com/solana-labs/solana-accountsdb-plugin-postgres/blob/master/ci/rust-version.sh
#
# This file maintains the rust versions for use by CI.
#
# Obtain the environment variables without any automatic toolchain updating:
# $ source ci/rust-version.sh
#
# Obtain the environment variables updating both stable and nightly, only stable, or
# only nightly:
# $ source ci/rust-version.sh all
# $ source ci/rust-version.sh stable
# $ source ci/rust-version.sh nightly
# Then to build with either stable or nightly:
# $ cargo +"$rust_stable" build
# $ cargo +"$rust_nightly" build
#
if [[ -n $RUST_STABLE_VERSION ]]; then
stable_version="$RUST_STABLE_VERSION"
else
stable_version=1.59.0
fi
if [[ -n $RUST_NIGHTLY_VERSION ]]; then
nightly_version="$RUST_NIGHTLY_VERSION"
else
nightly_version=2022-02-24
fi
export rust_stable="$stable_version"
export rust_stable_docker_image=solanalabs/rust:"$stable_version"
export rust_nightly=nightly-"$nightly_version"
export rust_nightly_docker_image=solanalabs/rust-nightly:"$nightly_version"
[[ -z $1 ]] || (
rustup_install() {
declare toolchain=$1
if ! cargo +"$toolchain" -V > /dev/null; then
echo "$0: Missing toolchain? Installing...: $toolchain" >&2
rustup install "$toolchain"
cargo +"$toolchain" -V
fi
}
set -e
cd "$(dirname "${BASH_SOURCE[0]}")"
case $1 in
stable)
rustup_install "$rust_stable"
;;
nightly)
rustup_install "$rust_nightly"
;;
all)
rustup_install "$rust_stable"
rustup_install "$rust_nightly"
;;
*)
echo "$0: Note: ignoring unknown argument: $1" >&2
;;
esac
)#!/usr/bin/env bash
# Source:
# https://github.com/solana-labs/solana-accountsdb-plugin-postgres/blob/master/ci/rust-version.sh
#
# This file maintains the rust versions for use by CI.
#
# Obtain the environment variables without any automatic toolchain updating:
# $ source ci/rust-version.sh
#
# Obtain the environment variables updating both stable and nightly, only stable, or
# only nightly:
# $ source ci/rust-version.sh all
# $ source ci/rust-version.sh stable
# $ source ci/rust-version.sh nightly
# Then to build with either stable or nightly:
# $ cargo +"$rust_stable" build
# $ cargo +"$rust_nightly" build
#
if [[ -n $RUST_STABLE_VERSION ]]; then
stable_version="$RUST_STABLE_VERSION"
else
stable_version=1.59.0
fi
if [[ -n $RUST_NIGHTLY_VERSION ]]; then
nightly_version="$RUST_NIGHTLY_VERSION"
else
nightly_version=2022-02-24
fi
export rust_stable="$stable_version"
export rust_stable_docker_image=solanalabs/rust:"$stable_version"
export rust_nightly=nightly-"$nightly_version"
export rust_nightly_docker_image=solanalabs/rust-nightly:"$nightly_version"
[[ -z $1 ]] || (
rustup_install() {
declare toolchain=$1
if ! cargo +"$toolchain" -V > /dev/null; then
echo "$0: Missing toolchain? Installing...: $toolchain" >&2
rustup install "$toolchain"
cargo +"$toolchain" -V
fi
}
set -e
cd "$(dirname "${BASH_SOURCE[0]}")"
case $1 in
stable)
rustup_install "$rust_stable"
;;
nightly)
rustup_install "$rust_nightly"
;;
all)
rustup_install "$rust_stable"
rustup_install "$rust_nightly"
;;
*)
echo "$0: Note: ignoring unknown argument: $1" >&2
;;
esac
)

View File

@ -1,9 +0,0 @@
#!/usr/bin/env bash
# Prints the Solana version.
set -e
cd "$(dirname "$0")/.."
cargo read-manifest | jq -r '.dependencies[] | select(.name == "solana-geyser-plugin-interface") | .req'

View File

@ -1,22 +0,0 @@
[package]
name = "solana-geyser-connector-mango"
version = "0.1.0"
authors = ["Christian Kamm <mail@ckamm.de>"]
edition = "2018"
[dependencies]
solana-geyser-connector-lib = { path = "../lib" }
solana-logger = "=1.10.35"
log = "0.4"
anyhow = "1.0"
toml = "0.5"
async-trait = "0.1"
fixed = { version = "1.9.0", features = ["serde"] }
bs58 = "0.3.1"
tokio = { version = "1", features = ["full"] }
tokio-postgres = "0.7.4"
postgres-types = { version = "0.2", features = ["array-impls", "derive"] }
postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev = "b4422051c8a31fbba4a35f88004c1cefb1878dd5" }
mango-v4 = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }

View File

@ -1,34 +0,0 @@
[source]
dedup_queue_size = 50000
rpc_ws_url = ""
[[source.grpc_sources]]
name = "server"
connection_string = "http://[::1]:10000"
retry_connection_sleep_secs = 30
#[source.grpc_sources.tls]
#ca_cert_path = "ca.pem"
#client_cert_path = "client.pem"
#client_key_path = "client.pem"
#domain_name = "example.com"
[source.snapshot]
rpc_http_url = ""
program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68"
[postgres_target]
connection_string = "host=/var/run/postgresql"
account_write_connection_count = 4
account_write_max_batch_size = 10
account_write_max_queue_size = 10000
slot_update_connection_count = 4
retry_query_max_count = 3
retry_query_sleep_secs = 5
retry_connection_sleep_secs = 30
fatal_connection_timeout_secs = 600
allow_invalid_certs = false
delete_old_data = true
monitoring_name = "example"
monitoring_update_interval_secs = 30
cleanup_interval_secs = 10

View File

@ -1,217 +0,0 @@
/**
* This plugin implementation for PostgreSQL requires the following tables
*/
CREATE TYPE "SlotStatus" AS ENUM (
'Rooted',
'Confirmed',
'Processed'
);
CREATE TABLE monitoring (
name TEXT PRIMARY KEY,
last_update TIMESTAMP WITH TIME ZONE,
last_slot_write TIMESTAMP WITH TIME ZONE,
last_account_write_write TIMESTAMP WITH TIME ZONE,
slot_queue BIGINT,
account_write_queue BIGINT
);
CREATE TABLE pubkey (
pubkey_id BIGSERIAL PRIMARY KEY,
pubkey VARCHAR(44) NOT NULL UNIQUE
);
-- Returns a pubkey_id for a pubkey, by getting it from the table or inserting it.
-- Getting this fully correct is complex, see:
-- https://stackoverflow.com/questions/15939902/is-select-or-insert-in-a-function-prone-to-race-conditions/15950324
-- and currently this function assumes there are no deletions in the pubkey table!
CREATE OR REPLACE FUNCTION map_pubkey(_pubkey varchar(44), OUT _pubkey_id bigint)
LANGUAGE plpgsql AS
$func$
BEGIN
LOOP
SELECT pubkey_id
FROM pubkey
WHERE pubkey = _pubkey
INTO _pubkey_id;
EXIT WHEN FOUND;
INSERT INTO pubkey AS t
(pubkey) VALUES (_pubkey)
ON CONFLICT (pubkey) DO NOTHING
RETURNING t.pubkey_id
INTO _pubkey_id;
EXIT WHEN FOUND;
END LOOP;
END
$func$;
CREATE OR REPLACE FUNCTION map_pubkey_arr(_pubkey_arr varchar(44)[], OUT _pubkey_id_arr bigint[])
LANGUAGE plpgsql AS
$func$
BEGIN
FOR i IN array_lower(_pubkey_arr, 1)..array_upper(_pubkey_arr, 1) LOOP
_pubkey_id_arr[i] := map_pubkey(_pubkey_arr[i]);
END LOOP;
END
$func$
RETURNS NULL ON NULL INPUT;
-- The table storing account writes, keeping only the newest write_version per slot
CREATE TABLE account_write (
pubkey_id BIGINT NOT NULL REFERENCES pubkey,
slot BIGINT NOT NULL,
write_version BIGINT NOT NULL,
is_selected BOOL NOT NULL,
owner_id BIGINT REFERENCES pubkey,
lamports BIGINT NOT NULL,
executable BOOL NOT NULL,
rent_epoch BIGINT NOT NULL,
data BYTEA,
PRIMARY KEY (pubkey_id, slot, write_version)
);
CREATE INDEX account_write_searchkey on account_write(pubkey_id, slot DESC, write_version DESC);
CREATE INDEX account_write_pubkey_id_idx on account_write(pubkey_id);
-- The table storing slot information
CREATE TABLE slot (
slot BIGINT PRIMARY KEY,
parent BIGINT,
status "SlotStatus" NOT NULL,
uncle BOOL NOT NULL
);
CREATE INDEX ON slot (parent);
CREATE TYPE "PerpAccount" AS (
base_position INT8,
quote_position NUMERIC, -- I80F48
long_settled_funding NUMERIC, -- I80F48
short_settled_funding NUMERIC, -- I80F48
bids_quantity INT8,
asks_quantity INT8,
taker_base INT8,
taker_quote INT8,
mngo_accrued NUMERIC -- u64
);
CREATE TABLE mango_account_write (
pubkey_id BIGINT NOT NULL REFERENCES pubkey,
slot BIGINT NOT NULL,
write_version BIGINT NOT NULL,
version INT2,
is_initialized BOOL,
extra_info BYTEA,
mango_group_id BIGINT REFERENCES pubkey,
owner_id BIGINT REFERENCES pubkey,
in_margin_basket BOOL[],
num_in_margin_basket INT2,
deposits NUMERIC[], -- I80F48[]
borrows NUMERIC[], -- I80F48[]
spot_open_orders_ids BIGINT[],
perp_accounts "PerpAccount"[],
order_market INT2[],
order_side INT2[],
orders NUMERIC[], -- i128[]
client_order_ids NUMERIC[], -- u64[]
msrm_amount NUMERIC, -- u64
being_liquidated BOOL,
is_bankrupt BOOL,
info BYTEA,
advanced_orders_key_id BIGINT REFERENCES pubkey,
padding BYTEA,
PRIMARY KEY (pubkey_id, slot, write_version)
);
CREATE INDEX mango_account_write_searchkey on mango_account_write(pubkey_id, slot DESC, write_version DESC);
CREATE INDEX mango_account_write_pubkey_id_idx on mango_account_write(pubkey_id);
CREATE TYPE "TokenInfo" AS (
mint varchar(44), -- TODO: also use pubkey table? but is unergonomic
root_bank varchar(44),
decimals INT2,
padding BYTEA
);
CREATE TYPE "SpotMarketInfo" AS (
spot_market varchar(44),
maint_asset_weight NUMERIC, -- all I80F48
init_asset_weight NUMERIC,
maint_liab_weight NUMERIC,
init_liab_weight NUMERIC,
liquidation_fee NUMERIC
);
CREATE TYPE "PerpMarketInfo" AS (
perp_market varchar(44),
maint_asset_weight NUMERIC, -- all I80F48
init_asset_weight NUMERIC,
maint_liab_weight NUMERIC,
init_liab_weight NUMERIC,
liquidation_fee NUMERIC,
maker_fee NUMERIC,
taker_fee NUMERIC,
base_lot_size INT8,
quote_lot_size INT8
);
CREATE TABLE mango_group_write (
pubkey_id BIGINT NOT NULL REFERENCES pubkey,
slot BIGINT NOT NULL,
write_version BIGINT NOT NULL,
version INT2,
is_initialized BOOL,
extra_info BYTEA,
num_oracles INT8, -- technically usize, but it's fine
tokens "TokenInfo"[],
spot_markets "SpotMarketInfo"[],
perp_markets "PerpMarketInfo"[],
oracle_ids BIGINT[],
signer_nonce NUMERIC, -- u64
signer_key_id BIGINT REFERENCES pubkey,
admin_id BIGINT REFERENCES pubkey,
dex_program_id BIGINT REFERENCES pubkey,
mango_cache_id BIGINT REFERENCES pubkey,
valid_interval NUMERIC, -- u64
insurance_vault_id BIGINT REFERENCES pubkey,
srm_vault_id BIGINT REFERENCES pubkey,
msrm_vault_id BIGINT REFERENCES pubkey,
fees_vault_id BIGINT REFERENCES pubkey,
padding BYTEA,
PRIMARY KEY (pubkey_id, slot, write_version)
);
CREATE INDEX mango_group_write_searchkey on mango_group_write(pubkey_id, slot DESC, write_version DESC);
CREATE INDEX mango_group_write_pubkey_id_idx on mango_group_write(pubkey_id);
CREATE TYPE "PriceCache" AS (
price NUMERIC, -- I80F48
last_update NUMERIC -- u64
);
CREATE TYPE "RootBankCache" AS (
deposit_index NUMERIC, -- I80F48
borrow_index NUMERIC, -- I80F48
last_update NUMERIC -- u64
);
CREATE TYPE "PerpMarketCache" AS (
long_funding NUMERIC, -- I80F48
short_funding NUMERIC, -- I80F48
last_update NUMERIC -- u64
);
CREATE TABLE mango_cache_write (
pubkey_id BIGINT NOT NULL REFERENCES pubkey,
slot BIGINT NOT NULL,
write_version BIGINT NOT NULL,
version INT2,
is_initialized BOOL,
extra_info BYTEA,
price_cache "PriceCache"[],
root_bank_cache "RootBankCache"[],
perp_market_cache "PerpMarketCache"[],
PRIMARY KEY (pubkey_id, slot, write_version)
);
CREATE INDEX mango_cache_write_searchkey on mango_cache_write(pubkey_id, slot DESC, write_version DESC);
CREATE INDEX mango_cache_write_pubkey_id_idx on mango_cache_write(pubkey_id);

View File

@ -1,190 +0,0 @@
-- Views for raw accounts
CREATE VIEW account_rooted AS
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
account_write.*
FROM account_write
LEFT JOIN slot USING(slot)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW account_confirmed AS
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
account_write.*
FROM account_write
LEFT JOIN slot USING(slot)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND ((slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW account_processed AS
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
account_write.*
FROM account_write
LEFT JOIN slot USING(slot)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_account_rooted AS
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
mango_account_write.*
FROM mango_account_write
LEFT JOIN slot USING(slot)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_account_confirmed AS
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
mango_account_write.*
FROM mango_account_write
LEFT JOIN slot USING(slot)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND ((slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_account_processed AS
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
mango_account_write.*
FROM mango_account_write
LEFT JOIN slot USING(slot)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_account_processed_balance AS
SELECT
pubkey,
unnest(array['MNGO', 'BTC', 'ETH', 'SOL', 'USDT', 'SRM', 'RAY', 'COPE', 'FTT', 'ADA', 'unused10', 'unused11', 'unused12', 'unused13', 'unused14', 'USDC']) as token,
unnest(deposits) as deposit,
unnest(borrows) as borrow
FROM mango_account_processed;
CREATE VIEW mango_account_processed_perp AS
SELECT
pubkey,
perp,
(q.perp_account).*
FROM (
SELECT
pubkey,
unnest(array['MNGO', 'BTC', 'ETH', 'SOL', 'unused_USDT', 'SRM', 'RAY', 'unused_COPE', 'FTT', 'ADA', 'unused10', 'unused11', 'unused12', 'unused13', 'unused14']) as perp,
unnest(perp_accounts) as perp_account
FROM mango_account_processed
) q;
CREATE VIEW mango_group_rooted AS
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
mango_group_write.*
FROM mango_group_write
LEFT JOIN slot USING(slot)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_group_confirmed AS
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
mango_group_write.*
FROM mango_group_write
LEFT JOIN slot USING(slot)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND ((slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_group_processed AS
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
mango_group_write.*
FROM mango_group_write
LEFT JOIN slot USING(slot)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_cache_rooted AS
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
mango_cache_write.*
FROM mango_cache_write
LEFT JOIN slot USING(slot)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_cache_confirmed AS
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
mango_cache_write.*
FROM mango_cache_write
LEFT JOIN slot USING(slot)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND ((slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_cache_processed AS
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
mango_cache_write.*
FROM mango_cache_write
LEFT JOIN slot USING(slot)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;

View File

@ -1,21 +0,0 @@
/**
* Script for cleaning up the schema for PostgreSQL used for the AccountsDb plugin.
*/
DROP TABLE monitoring CASCADE;
DROP TABLE slot CASCADE;
DROP TABLE account_write CASCADE;
DROP TABLE pubkey CASCADE;
DROP TYPE "SlotStatus";
DROP TABLE mango_group_write CASCADE;
DROP TABLE mango_cache_write CASCADE;
DROP TABLE mango_account_write CASCADE;
DROP TYPE "PerpAccount";
DROP TYPE "TokenInfo";
DROP TYPE "SpotMarketInfo";
DROP TYPE "PerpMarketInfo";
DROP TYPE "PriceCache";
DROP TYPE "RootBankCache";
DROP TYPE "PerpMarketCache";

View File

@ -1,18 +0,0 @@
DROP VIEW account_rooted;
DROP VIEW account_confirmed;
DROP VIEW account_processed;
DROP VIEW mango_account_processed_balance;
DROP VIEW mango_account_processed_perp;
DROP VIEW mango_account_rooted;
DROP VIEW mango_account_confirmed;
DROP VIEW mango_account_processed;
DROP VIEW mango_group_rooted;
DROP VIEW mango_group_confirmed;
DROP VIEW mango_group_processed;
DROP VIEW mango_cache_rooted;
DROP VIEW mango_cache_confirmed;
DROP VIEW mango_cache_processed;

View File

@ -1,51 +0,0 @@
with
inputs as (select
1 as market_index,
(select pubkey_id from pubkey
where pubkey = '98pjRuQjK3qA6gXts96PqZT4Ze5QmnCmt3QYjhbUSPue')
as group_pubkey_id
),
group_and_cache as (select
mango_group,
mango_cache
from
inputs,
mango_group_processed mango_group,
mango_cache_processed mango_cache
where
mango_group.pubkey_id = inputs.group_pubkey_id
and mango_cache.pubkey_id = mango_group.mango_cache_id
),
params as (select
(mango_group).perp_markets[market_index].base_lot_size
as contract_size,
(mango_cache).price_cache[market_index].price as price,
(mango_cache).perp_market_cache[market_index] as pmc
from
group_and_cache,
inputs
),
market_perp_accounts as (select
pubkey, (perp_accounts[market_index]).*
from
mango_account_processed,
inputs
where
mango_group_id = group_pubkey_id
)
select
pubkey,
base_position * contract_size * price +
quote_position -
case
when base_position > 0 then
((pmc).long_funding - long_settled_funding) * base_position
when base_position < 0 then
((pmc).short_funding - short_settled_funding) * base_position
else 0
end
as pnl
from
market_perp_accounts,
params
order by pnl desc;

View File

@ -1,59 +0,0 @@
mod mango;
use {
log::*,
solana_geyser_connector_lib::*,
std::{fs::File, io::Read, sync::Arc},
};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
println!("requires a config file argument");
return Ok(());
}
let config: Config = {
let mut file = File::open(&args[1])?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
toml::from_str(&contents).unwrap()
};
solana_logger::setup_with_default("info");
info!("startup");
let account_tables: AccountTables = vec![
Arc::new(RawAccountTable {}),
Arc::new(mango::MangoAccountTable {}),
Arc::new(mango::MangoGroupTable {}),
Arc::new(mango::MangoCacheTable {}),
];
let metrics_tx = metrics::start(config.metrics, "connector-mango".into());
let (account_write_queue_sender, slot_queue_sender) =
postgres_target::init(&config.postgres_target, account_tables, metrics_tx.clone()).await?;
info!("postgres done");
let use_geyser = true;
if use_geyser {
grpc_plugin_source::process_events(
&config.source,
account_write_queue_sender,
slot_queue_sender,
metrics_tx,
)
.await;
} else {
websocket_source::process_events(
&config.source,
account_write_queue_sender,
slot_queue_sender,
)
.await;
}
Ok(())
}

View File

@ -1,426 +0,0 @@
use {
async_trait::async_trait,
mango::state::{DataType, MangoAccount, MangoCache, MangoGroup},
mango_common::Loadable,
postgres_types::ToSql,
std::mem,
};
use crate::{encode_address, postgres_types_numeric::*, AccountTable, AccountWrite};
#[derive(Debug, ToSql)]
struct PerpAccount {
base_position: i64,
quote_position: SqlNumericI80F48,
long_settled_funding: SqlNumericI80F48,
short_settled_funding: SqlNumericI80F48,
bids_quantity: i64,
asks_quantity: i64,
taker_base: i64,
taker_quote: i64,
mngo_accrued: SqlNumericU64,
}
pub struct MangoAccountTable {}
#[async_trait]
impl AccountTable for MangoAccountTable {
fn table_name(&self) -> &str {
"mango_account_write"
}
async fn insert_account_write(
&self,
client: &postgres_query::Caching<tokio_postgres::Client>,
account_write: &AccountWrite,
) -> anyhow::Result<()> {
if account_write.data.len() != mem::size_of::<MangoAccount>()
|| account_write.data[0] != DataType::MangoAccount as u8
{
return Ok(());
}
// TODO: Also filter on mango_group?
let pubkey = encode_address(&account_write.pubkey);
let data = MangoAccount::load_from_bytes(&account_write.data)?;
let slot = account_write.slot as i64;
let write_version = account_write.write_version as i64;
let owner = encode_address(&data.owner);
let mango_group = encode_address(&data.mango_group);
let version = data.meta_data.version as i16;
let extra_info = &data.meta_data.extra_info as &[u8];
let in_margin_basket = &data.in_margin_basket as &[bool];
let num_in_margin_basket = data.num_in_margin_basket as i16;
let deposits = data
.deposits
.iter()
.map(|v| SqlNumericI80F48(*v))
.collect::<Vec<SqlNumericI80F48>>();
let borrows = data
.borrows
.iter()
.map(|v| SqlNumericI80F48(*v))
.collect::<Vec<SqlNumericI80F48>>();
let spot_open_orders = data
.spot_open_orders
.iter()
.map(|key| encode_address(key))
.collect::<Vec<String>>();
let perp_accounts = data
.perp_accounts
.iter()
.map(|perp| PerpAccount {
base_position: perp.base_position,
quote_position: SqlNumericI80F48(perp.quote_position),
long_settled_funding: SqlNumericI80F48(perp.long_settled_funding),
short_settled_funding: SqlNumericI80F48(perp.short_settled_funding),
bids_quantity: perp.bids_quantity,
asks_quantity: perp.asks_quantity,
taker_base: perp.taker_base,
taker_quote: perp.taker_quote,
mngo_accrued: SqlNumericU64(perp.mngo_accrued),
})
.collect::<Vec<PerpAccount>>();
let order_market = data
.order_market
.iter()
.map(|v| *v as i16)
.collect::<Vec<i16>>();
let order_side = data
.order_side
.iter()
.map(|v| *v as i16)
.collect::<Vec<i16>>();
let orders = data
.orders
.iter()
.map(|v| SqlNumericI128(*v))
.collect::<Vec<SqlNumericI128>>();
let client_order_ids = data
.client_order_ids
.iter()
.map(|v| SqlNumericU64(*v))
.collect::<Vec<SqlNumericU64>>();
let msrm_amount = SqlNumericU64(data.msrm_amount);
let info = &data.info as &[u8];
let advanced_orders_key = encode_address(&data.advanced_orders_key);
let padding = &data.padding as &[u8];
let query = postgres_query::query!(
"
INSERT INTO mango_account_write
(pubkey_id, slot, write_version,
version, is_initialized, extra_info, mango_group_id,
owner_id, in_margin_basket, num_in_margin_basket, deposits,
borrows, spot_open_orders_ids, perp_accounts, order_market,
order_side, orders, client_order_ids,
msrm_amount, being_liquidated, is_bankrupt, info,
advanced_orders_key_id, padding
)
VALUES
(map_pubkey($pubkey), $slot, $write_version,
$version, $is_initialized, $extra_info, map_pubkey($mango_group),
map_pubkey($owner), $in_margin_basket, $num_in_margin_basket, $deposits,
$borrows, map_pubkey_arr($spot_open_orders), $perp_accounts, $order_market,
$order_side, $orders, $client_order_ids,
$msrm_amount, $being_liquidated, $is_bankrupt, $info,
map_pubkey($advanced_orders_key), $padding
)
ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING",
pubkey,
slot,
write_version,
version,
is_initialized = data.meta_data.is_initialized,
extra_info,
mango_group,
owner,
in_margin_basket,
num_in_margin_basket,
deposits,
borrows,
spot_open_orders,
perp_accounts,
order_market,
order_side,
orders,
client_order_ids,
msrm_amount,
being_liquidated = data.being_liquidated,
is_bankrupt = data.is_bankrupt,
info,
advanced_orders_key,
padding,
);
let _ = query.execute(client).await?;
Ok(())
}
}
#[derive(Debug, ToSql)]
struct TokenInfo {
mint: String,
root_bank: String,
decimals: i16,
padding: Vec<u8>,
}
#[derive(Debug, ToSql)]
struct SpotMarketInfo {
spot_market: String,
maint_asset_weight: SqlNumericI80F48,
init_asset_weight: SqlNumericI80F48,
maint_liab_weight: SqlNumericI80F48,
init_liab_weight: SqlNumericI80F48,
liquidation_fee: SqlNumericI80F48,
}
#[derive(Debug, ToSql)]
struct PerpMarketInfo {
perp_market: String,
maint_asset_weight: SqlNumericI80F48,
init_asset_weight: SqlNumericI80F48,
maint_liab_weight: SqlNumericI80F48,
init_liab_weight: SqlNumericI80F48,
liquidation_fee: SqlNumericI80F48,
maker_fee: SqlNumericI80F48,
taker_fee: SqlNumericI80F48,
base_lot_size: i64,
quote_lot_size: i64,
}
pub struct MangoGroupTable {}
#[async_trait]
impl AccountTable for MangoGroupTable {
fn table_name(&self) -> &str {
"mango_group_write"
}
async fn insert_account_write(
&self,
client: &postgres_query::Caching<tokio_postgres::Client>,
account_write: &AccountWrite,
) -> anyhow::Result<()> {
if account_write.data.len() != mem::size_of::<MangoGroup>()
|| account_write.data[0] != DataType::MangoGroup as u8
{
return Ok(());
}
// TODO: Also filter on mango_group pubkey?
let pubkey = encode_address(&account_write.pubkey);
let data = MangoGroup::load_from_bytes(&account_write.data)?;
let slot = account_write.slot as i64;
let write_version = account_write.write_version as i64;
let version = data.meta_data.version as i16;
let extra_info = &data.meta_data.extra_info as &[u8];
let num_oracles = data.num_oracles as i64;
let tokens = data
.tokens
.iter()
.map(|token| TokenInfo {
mint: encode_address(&token.mint),
root_bank: encode_address(&token.root_bank),
decimals: token.decimals as i16,
padding: token.padding.to_vec(),
})
.collect::<Vec<TokenInfo>>();
let spot_markets = data
.spot_markets
.iter()
.map(|market| SpotMarketInfo {
spot_market: encode_address(&market.spot_market),
maint_asset_weight: SqlNumericI80F48(market.maint_asset_weight),
init_asset_weight: SqlNumericI80F48(market.init_asset_weight),
maint_liab_weight: SqlNumericI80F48(market.maint_liab_weight),
init_liab_weight: SqlNumericI80F48(market.init_liab_weight),
liquidation_fee: SqlNumericI80F48(market.liquidation_fee),
})
.collect::<Vec<SpotMarketInfo>>();
let perp_markets = data
.perp_markets
.iter()
.map(|market| PerpMarketInfo {
perp_market: encode_address(&market.perp_market),
maint_asset_weight: SqlNumericI80F48(market.maint_asset_weight),
init_asset_weight: SqlNumericI80F48(market.init_asset_weight),
maint_liab_weight: SqlNumericI80F48(market.maint_liab_weight),
init_liab_weight: SqlNumericI80F48(market.init_liab_weight),
liquidation_fee: SqlNumericI80F48(market.liquidation_fee),
maker_fee: SqlNumericI80F48(market.maker_fee),
taker_fee: SqlNumericI80F48(market.taker_fee),
base_lot_size: market.base_lot_size,
quote_lot_size: market.quote_lot_size,
})
.collect::<Vec<PerpMarketInfo>>();
let oracles = data
.oracles
.iter()
.map(|key| encode_address(key))
.collect::<Vec<String>>();
let signer_nonce = SqlNumericU64(data.signer_nonce);
let signer_key = encode_address(&data.signer_key);
let admin = encode_address(&data.admin);
let dex_program_id = encode_address(&data.dex_program_id);
let mango_cache = encode_address(&data.mango_cache);
let valid_interval = SqlNumericU64(data.valid_interval);
let insurance_vault = encode_address(&data.insurance_vault);
let srm_vault = encode_address(&data.srm_vault);
let msrm_vault = encode_address(&data.msrm_vault);
let fees_vault = encode_address(&data.fees_vault);
let padding = &data.padding as &[u8];
let query = postgres_query::query!(
"
INSERT INTO mango_group_write
(pubkey_id, slot, write_version,
version, is_initialized, extra_info,
num_oracles,
tokens,
spot_markets,
perp_markets,
oracle_ids, signer_nonce, signer_key_id, admin_id,
dex_program_id, mango_cache_id, valid_interval,
insurance_vault_id, srm_vault_id, msrm_vault_id,
fees_vault_id,
padding)
VALUES
(map_pubkey($pubkey), $slot, $write_version,
$version, $is_initialized, $extra_info,
$num_oracles,
$tokens,
$spot_markets,
$perp_markets,
map_pubkey_arr($oracles), $signer_nonce, map_pubkey($signer_key), map_pubkey($admin),
map_pubkey($dex_program_id), map_pubkey($mango_cache), $valid_interval,
map_pubkey($insurance_vault), map_pubkey($srm_vault), map_pubkey($msrm_vault),
map_pubkey($fees_vault),
$padding)
ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING",
pubkey,
slot,
write_version,
version,
is_initialized = data.meta_data.is_initialized,
extra_info,
num_oracles,
tokens,
spot_markets,
perp_markets,
oracles,
signer_nonce,
signer_key,
admin,
dex_program_id,
mango_cache,
valid_interval,
insurance_vault,
srm_vault,
msrm_vault,
fees_vault,
padding,
);
let _ = query.execute(client).await?;
Ok(())
}
}
#[derive(Debug, ToSql)]
struct PriceCache {
price: SqlNumericI80F48,
last_update: SqlNumericU64,
}
#[derive(Debug, ToSql)]
struct RootBankCache {
deposit_index: SqlNumericI80F48,
borrow_index: SqlNumericI80F48,
last_update: SqlNumericU64,
}
#[derive(Debug, ToSql)]
struct PerpMarketCache {
long_funding: SqlNumericI80F48,
short_funding: SqlNumericI80F48,
last_update: SqlNumericU64,
}
pub struct MangoCacheTable {}
#[async_trait]
impl AccountTable for MangoCacheTable {
fn table_name(&self) -> &str {
"mango_cache_write"
}
async fn insert_account_write(
&self,
client: &postgres_query::Caching<tokio_postgres::Client>,
account_write: &AccountWrite,
) -> anyhow::Result<()> {
if account_write.data.len() != mem::size_of::<MangoCache>()
|| account_write.data[0] != DataType::MangoCache as u8
{
return Ok(());
}
// TODO: This one can't be fitlered to only use the one for our mango_group?
let pubkey = encode_address(&account_write.pubkey);
let data = MangoCache::load_from_bytes(&account_write.data)?;
let slot = account_write.slot as i64;
let write_version = account_write.write_version as i64;
let version = data.meta_data.version as i16;
let extra_info = &data.meta_data.extra_info as &[u8];
let price_cache = data
.price_cache
.iter()
.map(|cache| PriceCache {
price: SqlNumericI80F48(cache.price),
last_update: SqlNumericU64(cache.last_update),
})
.collect::<Vec<PriceCache>>();
let root_bank_cache = data
.root_bank_cache
.iter()
.map(|cache| RootBankCache {
deposit_index: SqlNumericI80F48(cache.deposit_index),
borrow_index: SqlNumericI80F48(cache.borrow_index),
last_update: SqlNumericU64(cache.last_update),
})
.collect::<Vec<RootBankCache>>();
let perp_market_cache = data
.perp_market_cache
.iter()
.map(|cache| PerpMarketCache {
long_funding: SqlNumericI80F48(cache.long_funding),
short_funding: SqlNumericI80F48(cache.short_funding),
last_update: SqlNumericU64(cache.last_update),
})
.collect::<Vec<PerpMarketCache>>();
let query = postgres_query::query!(
"
INSERT INTO mango_cache_write
(pubkey_id, slot, write_version,
version, is_initialized, extra_info,
price_cache, root_bank_cache, perp_market_cache)
VALUES
(map_pubkey($pubkey), $slot, $write_version,
$version, $is_initialized, $extra_info,
$price_cache, $root_bank_cache, $perp_market_cache)
ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING",
pubkey,
slot,
write_version,
version,
is_initialized = data.meta_data.is_initialized,
extra_info,
price_cache,
root_bank_cache,
perp_market_cache,
);
let _ = query.execute(client).await?;
Ok(())
}
}

View File

@ -1,13 +0,0 @@
[package]
name = "solana-geyser-connector-raw"
version = "0.1.0"
authors = ["Christian Kamm <mail@ckamm.de>"]
edition = "2018"
[dependencies]
solana-geyser-connector-lib = { path = "../lib" }
solana-logger = "=1.10.35"
log = "0.4"
tokio = { version = "1", features = ["full"] }
anyhow = "1.0"
toml = "0.5"

View File

@ -1,35 +0,0 @@
[source]
dedup_queue_size = 50000
rpc_ws_url = ""
[[source.grpc_sources]]
name = "server"
connection_string = "http://[::1]:10000"
retry_connection_sleep_secs = 30
#[source.grpc_sources.tls]
#ca_cert_path = "ca.pem"
#client_cert_path = "client.pem"
#client_key_path = "client.pem"
#domain_name = "example.com"
[source.snapshot]
rpc_http_url = ""
program_id = ""
[postgres_target]
connection_string = "host=/var/run/postgresql"
account_write_connection_count = 4
account_write_max_batch_size = 10
account_write_max_queue_size = 10000
slot_update_connection_count = 2
retry_query_max_count = 3
retry_query_sleep_secs = 5
retry_connection_sleep_secs = 30
fatal_connection_timeout_secs = 600
allow_invalid_certs = false
delete_old_data = true
monitoring_name = "example"
monitoring_update_interval_secs = 30
cleanup_interval_secs = 10

View File

@ -1,75 +0,0 @@
/**
* This plugin implementation for PostgreSQL requires the following tables
*/
CREATE TYPE "SlotStatus" AS ENUM (
'Rooted',
'Confirmed',
'Processed'
);
CREATE TABLE monitoring (
name TEXT PRIMARY KEY,
last_update TIMESTAMP WITH TIME ZONE,
last_slot_write TIMESTAMP WITH TIME ZONE,
last_account_write_write TIMESTAMP WITH TIME ZONE,
slot_queue BIGINT,
account_write_queue BIGINT
);
CREATE TABLE pubkey (
pubkey_id BIGSERIAL PRIMARY KEY,
pubkey VARCHAR(44) NOT NULL UNIQUE
);
-- Returns a pubkey_id for a pubkey, by getting it from the table or inserting it.
-- Getting this fully correct is complex, see:
-- https://stackoverflow.com/questions/15939902/is-select-or-insert-in-a-function-prone-to-race-conditions/15950324
-- and currently this function assumes there are no deletions in the pubkey table!
CREATE OR REPLACE FUNCTION map_pubkey(_pubkey varchar(44), OUT _pubkey_id bigint)
LANGUAGE plpgsql AS
$func$
BEGIN
LOOP
SELECT pubkey_id
FROM pubkey
WHERE pubkey = _pubkey
INTO _pubkey_id;
EXIT WHEN FOUND;
INSERT INTO pubkey AS t
(pubkey) VALUES (_pubkey)
ON CONFLICT (pubkey) DO NOTHING
RETURNING t.pubkey_id
INTO _pubkey_id;
EXIT WHEN FOUND;
END LOOP;
END
$func$;
-- The table storing account writes, keeping only the newest write_version per slot
CREATE TABLE account_write (
pubkey_id BIGINT NOT NULL REFERENCES pubkey,
slot BIGINT NOT NULL,
write_version BIGINT NOT NULL,
is_selected BOOL NOT NULL,
owner_id BIGINT REFERENCES pubkey,
lamports BIGINT NOT NULL,
executable BOOL NOT NULL,
rent_epoch BIGINT NOT NULL,
data BYTEA,
PRIMARY KEY (pubkey_id, slot, write_version)
);
CREATE INDEX account_write_searchkey on account_write(pubkey_id, slot DESC, write_version DESC);
CREATE INDEX account_write_pubkey_id_idx on account_write(pubkey_id);
-- The table storing slot information
CREATE TABLE slot (
slot BIGINT PRIMARY KEY,
parent BIGINT,
status "SlotStatus" NOT NULL,
uncle BOOL NOT NULL
);
CREATE INDEX ON slot (parent);

View File

@ -1,40 +0,0 @@
-- Views for raw accounts
CREATE VIEW account_rooted AS
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
account_write.*
FROM account_write
LEFT JOIN slot USING(slot)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW account_confirmed AS
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
account_write.*
FROM account_write
LEFT JOIN slot USING(slot)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND ((slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW account_processed AS
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
account_write.*
FROM account_write
LEFT JOIN slot USING(slot)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;

View File

@ -1,9 +0,0 @@
/**
* Script for cleaning up the schema for PostgreSQL used for the AccountsDb plugin.
*/
DROP TABLE monitoring CASCADE;
DROP TABLE slot CASCADE;
DROP TABLE account_write CASCADE;
DROP TABLE pubkey CASCADE;
DROP TYPE "SlotStatus";

View File

@ -1,3 +0,0 @@
DROP VIEW account_rooted;
DROP VIEW account_confirmed;
DROP VIEW account_processed;

View File

@ -1,52 +0,0 @@
use {
log::*,
solana_geyser_connector_lib::*,
std::{fs::File, io::Read, sync::Arc},
};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
println!("requires a config file argument");
return Ok(());
}
let config: Config = {
let mut file = File::open(&args[1])?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
toml::from_str(&contents).unwrap()
};
solana_logger::setup_with_default("info");
info!("startup");
let metrics_tx = metrics::start(config.metrics, "connector-raw".into());
let account_tables: AccountTables = vec![Arc::new(RawAccountTable {})];
let (account_write_queue_sender, slot_queue_sender) =
postgres_target::init(&config.postgres_target, account_tables, metrics_tx.clone()).await?;
info!("postgres done");
let use_geyser = true;
if use_geyser {
grpc_plugin_source::process_events(
&config.source,
account_write_queue_sender,
slot_queue_sender,
metrics_tx,
)
.await;
} else {
websocket_source::process_events(
&config.source,
account_write_queue_sender,
slot_queue_sender,
)
.await;
}
Ok(())
}

View File

@ -1,43 +0,0 @@
[package]
name = "solana-geyser-connector-plugin-grpc"
version = "0.1.0"
authors = ["Christian Kamm <mail@ckamm.de>"]
edition = "2018"
[lib]
crate-type = ["cdylib", "rlib"]
[[bin]]
name = "test-server"
path = "src/test_server.rs"
[dependencies]
bs58 = "0.4.0"
log = "0.4.14"
serde = "1.0.130"
serde_derive = "1.0.103"
serde_json = "1.0.67"
solana-geyser-plugin-interface = "=1.10.35"
solana-logger = "=1.10.35"
solana-metrics = "=1.10.35"
solana-sdk = "=1.10.35"
tonic = { version = "0.6", features = ["compression"] }
prost = "0.9"
futures-core = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "sync", "time"] }
tokio-stream = "0.1"
async-stream = "0.2"
rand = "0.8"
zstd = "0.11.2"
zstd-safe = "5.0.2"
[build-dependencies]
tonic-build = { version = "0.6", features = ["compression"] }
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

View File

@ -1,4 +0,0 @@
fn main() {
tonic_build::compile_protos("../proto/geyser.proto")
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
}

View File

@ -1,11 +0,0 @@
{
"libpath": "/path/to/libsolana_accountsdb_connector_plugin_grpc.so",
"accounts_selector" : {
"owners" : ["mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68"]
},
"bind_address": "[::1]:10000",
"service_config": {
"broadcast_buffer_size": 10000,
"subscriber_buffer_size": 10000
}
}

View File

@ -1,69 +0,0 @@
use {log::*, std::collections::HashSet};
#[derive(Debug)]
pub(crate) struct AccountsSelector {
pub accounts: HashSet<Vec<u8>>,
pub owners: HashSet<Vec<u8>>,
pub select_all_accounts: bool,
}
impl AccountsSelector {
pub fn default() -> Self {
AccountsSelector {
accounts: HashSet::default(),
owners: HashSet::default(),
select_all_accounts: true,
}
}
pub fn new(accounts: &[String], owners: &[String]) -> Self {
info!(
"Creating AccountsSelector from accounts: {:?}, owners: {:?}",
accounts, owners
);
let select_all_accounts = accounts.iter().any(|key| key == "*");
if select_all_accounts {
return AccountsSelector {
accounts: HashSet::default(),
owners: HashSet::default(),
select_all_accounts,
};
}
let accounts = accounts
.iter()
.map(|key| bs58::decode(key).into_vec().unwrap())
.collect();
let owners = owners
.iter()
.map(|key| bs58::decode(key).into_vec().unwrap())
.collect();
AccountsSelector {
accounts,
owners,
select_all_accounts,
}
}
pub fn is_account_selected(&self, account: &[u8], owner: &[u8]) -> bool {
self.select_all_accounts || self.accounts.contains(account) || self.owners.contains(owner)
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
#[test]
fn test_create_accounts_selector() {
AccountsSelector::new(
&["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_string()],
&[],
);
AccountsSelector::new(
&[],
&["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_string()],
);
}
}

View File

@ -1,42 +0,0 @@
use std::io::{Read, Write};
pub fn zstd_compress(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
let mut encoder = zstd::stream::write::Encoder::new(Vec::new(), 0).unwrap();
encoder.write_all(data)?;
encoder.finish()
}
pub fn zstd_decompress(data: &[u8], uncompressed: &mut Vec<u8>) -> Result<usize, std::io::Error> {
let mut decoder = zstd::stream::read::Decoder::new(data).unwrap();
decoder.read_to_end(uncompressed)
}
pub(crate) mod tests {
#[allow(unused_imports)]
use super::*;
#[test]
fn test_zstd_compression() {
let data = vec![100; 256]; //sample data, 256 bytes of val 100.
println!("Uncompressed Data = {:?}", data);
match zstd_compress(&data) {
Ok(compressed) => {
println!("Compressed Data = {:?}\n", compressed);
let mut uncompressed: Vec<u8> = Vec::new();
match zstd_decompress(&compressed, &mut uncompressed) {
Ok(_) => {
println!("Uncompressed Data = {:?}\n", uncompressed);
}
Err(e) => {
println!("Error = {:?}", e);
}
}
}
Err(e) => {
println!("Error compressing Data {:?}", e);
}
}
}
}

View File

@ -1,411 +0,0 @@
use crate::compression::zstd_compress;
use {
crate::accounts_selector::AccountsSelector,
bs58,
geyser_proto::{
slot_update::Status as SlotUpdateStatus, update::UpdateOneof, AccountWrite, Ping,
SlotUpdate, SubscribeRequest, SubscribeResponse, Update,
},
log::*,
serde_derive::Deserialize,
serde_json,
solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, Result as PluginResult,
SlotStatus,
},
std::collections::HashSet,
std::convert::TryInto,
std::sync::atomic::{AtomicU64, Ordering},
std::sync::RwLock,
std::{fs::File, io::Read, sync::Arc},
tokio::sync::{broadcast, mpsc},
tonic::transport::Server,
};
pub mod geyser_proto {
tonic::include_proto!("accountsdb");
}
pub mod geyser_service {
use super::*;
use {
geyser_proto::accounts_db_server::AccountsDb,
tokio_stream::wrappers::ReceiverStream,
tonic::{Code, Request, Response, Status},
};
#[derive(Clone, Debug, Deserialize)]
pub struct ServiceConfig {
broadcast_buffer_size: usize,
subscriber_buffer_size: usize,
}
#[derive(Debug)]
pub struct Service {
pub sender: broadcast::Sender<Update>,
pub config: ServiceConfig,
pub highest_write_slot: Arc<AtomicU64>,
}
impl Service {
pub fn new(config: ServiceConfig, highest_write_slot: Arc<AtomicU64>) -> Self {
let (tx, _) = broadcast::channel(config.broadcast_buffer_size);
Self {
sender: tx,
config,
highest_write_slot,
}
}
}
#[tonic::async_trait]
impl AccountsDb for Service {
type SubscribeStream = ReceiverStream<Result<Update, Status>>;
async fn subscribe(
&self,
_request: Request<SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
info!("new subscriber");
let (tx, rx) = mpsc::channel(self.config.subscriber_buffer_size);
let mut broadcast_rx = self.sender.subscribe();
tx.send(Ok(Update {
update_oneof: Some(UpdateOneof::SubscribeResponse(SubscribeResponse {
highest_write_slot: self.highest_write_slot.load(Ordering::SeqCst),
})),
}))
.await
.unwrap();
tokio::spawn(async move {
let mut exit = false;
while !exit {
let fwd = broadcast_rx.recv().await.map_err(|err| {
// Note: If we can't keep up pulling from the broadcast
// channel here, there'll be a Lagged error, and we'll
// close the connection because data was lost.
warn!("error while receiving message to be broadcast: {:?}", err);
exit = true;
Status::new(Code::Internal, err.to_string())
});
if let Err(_err) = tx.send(fwd).await {
info!("subscriber stream closed");
exit = true;
}
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
}
pub struct PluginData {
runtime: Option<tokio::runtime::Runtime>,
server_broadcast: broadcast::Sender<Update>,
server_exit_sender: Option<broadcast::Sender<()>>,
accounts_selector: AccountsSelector,
/// Largest slot that an account write was processed for
highest_write_slot: Arc<AtomicU64>,
/// Accounts that saw account writes
///
/// Needed to catch writes that signal account closure, where
/// lamports=0 and owner=system-program.
active_accounts: RwLock<HashSet<[u8; 32]>>,
zstd_compression: bool,
}
#[derive(Default)]
pub struct Plugin {
// initialized by on_load()
data: Option<PluginData>,
}
impl std::fmt::Debug for Plugin {
fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
#[derive(Clone, Debug, Deserialize)]
pub struct PluginConfig {
pub bind_address: String,
pub service_config: geyser_service::ServiceConfig,
pub zstd_compression: bool,
}
impl PluginData {
fn broadcast(&self, update: UpdateOneof) {
// Don't care about the error that happens when there are no receivers.
let _ = self.server_broadcast.send(Update {
update_oneof: Some(update),
});
}
}
impl GeyserPlugin for Plugin {
fn name(&self) -> &'static str {
"GeyserPluginGrpc"
}
fn on_load(&mut self, config_file: &str) -> PluginResult<()> {
solana_logger::setup_with_default("info");
info!(
"Loading plugin {:?} from config_file {:?}",
self.name(),
config_file
);
let mut file = File::open(config_file)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
let result: serde_json::Value = serde_json::from_str(&contents).unwrap();
let accounts_selector = Self::create_accounts_selector_from_config(&result);
let config: PluginConfig = serde_json::from_str(&contents).map_err(|err| {
GeyserPluginError::ConfigFileReadError {
msg: format!(
"The config file is not in the JSON format expected: {:?}",
err
),
}
})?;
let addr =
config
.bind_address
.parse()
.map_err(|err| GeyserPluginError::ConfigFileReadError {
msg: format!("Error parsing the bind_address {:?}", err),
})?;
let highest_write_slot = Arc::new(AtomicU64::new(0));
let service =
geyser_service::Service::new(config.service_config, highest_write_slot.clone());
let (server_exit_sender, mut server_exit_receiver) = broadcast::channel::<()>(1);
let server_broadcast = service.sender.clone();
let server = geyser_proto::accounts_db_server::AccountsDbServer::new(service)
.accept_gzip()
.send_gzip();
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.spawn(Server::builder().add_service(server).serve_with_shutdown(
addr,
async move {
let _ = server_exit_receiver.recv().await;
},
));
let server_broadcast_c = server_broadcast.clone();
let mut server_exit_receiver = server_exit_sender.subscribe();
runtime.spawn(async move {
loop {
// Don't care about the error if there are no receivers.
let _ = server_broadcast_c.send(Update {
update_oneof: Some(UpdateOneof::Ping(Ping {})),
});
tokio::select! {
_ = server_exit_receiver.recv() => { break; },
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {},
}
}
});
self.data = Some(PluginData {
runtime: Some(runtime),
server_broadcast,
server_exit_sender: Some(server_exit_sender),
accounts_selector,
highest_write_slot,
active_accounts: RwLock::new(HashSet::new()),
zstd_compression: config.zstd_compression,
});
Ok(())
}
fn on_unload(&mut self) {
info!("Unloading plugin: {:?}", self.name());
let mut data = self.data.take().expect("plugin must be initialized");
data.server_exit_sender
.take()
.expect("on_unload can only be called once")
.send(())
.expect("sending grpc server termination should succeed");
data.runtime
.take()
.expect("must exist")
.shutdown_background();
}
fn update_account(
&mut self,
account: ReplicaAccountInfoVersions,
slot: u64,
is_startup: bool,
) -> PluginResult<()> {
let data = self.data.as_ref().expect("plugin must be initialized");
match account {
ReplicaAccountInfoVersions::V0_0_1(account) => {
if account.pubkey.len() != 32 {
error!(
"bad account pubkey length: {}",
bs58::encode(account.pubkey).into_string()
);
return Ok(());
}
// Select only accounts configured to look at, plus writes to accounts
// that were previously selected (to catch closures and account reuse)
let is_selected = data
.accounts_selector
.is_account_selected(account.pubkey, account.owner);
let previously_selected = {
let read = data.active_accounts.read().unwrap();
read.contains(&account.pubkey[0..32])
};
if !is_selected && !previously_selected {
return Ok(());
}
// If the account is newly selected, add it
if !previously_selected {
let mut write = data.active_accounts.write().unwrap();
write.insert(account.pubkey.try_into().unwrap());
}
data.highest_write_slot.fetch_max(slot, Ordering::SeqCst);
debug!(
"Updating account {:?} with owner {:?} at slot {:?}",
bs58::encode(account.pubkey).into_string(),
bs58::encode(account.owner).into_string(),
slot,
);
let mut account_data = account.data.to_vec();
//zstd compress if enabled.
if data.zstd_compression {
match zstd_compress(&account_data) {
Ok(res) => account_data = res,
Err(e) => {
println!(
"zstd_decompress compression failed = {:?} , using original data.",
e
);
}
}
}
data.broadcast(UpdateOneof::AccountWrite(AccountWrite {
slot,
is_startup,
write_version: account.write_version,
pubkey: account.pubkey.to_vec(),
lamports: account.lamports,
owner: account.owner.to_vec(),
executable: account.executable,
rent_epoch: account.rent_epoch,
data: account_data,
is_selected,
}));
}
}
Ok(())
}
fn update_slot_status(
&mut self,
slot: u64,
parent: Option<u64>,
status: SlotStatus,
) -> PluginResult<()> {
let data = self.data.as_ref().expect("plugin must be initialized");
debug!("Updating slot {:?} at with status {:?}", slot, status);
let status = match status {
SlotStatus::Processed => SlotUpdateStatus::Processed,
SlotStatus::Confirmed => SlotUpdateStatus::Confirmed,
SlotStatus::Rooted => SlotUpdateStatus::Rooted,
};
data.broadcast(UpdateOneof::SlotUpdate(SlotUpdate {
slot,
parent,
status: status as i32,
}));
Ok(())
}
fn notify_end_of_startup(&mut self) -> PluginResult<()> {
Ok(())
}
}
impl Plugin {
fn create_accounts_selector_from_config(config: &serde_json::Value) -> AccountsSelector {
let accounts_selector = &config["accounts_selector"];
if accounts_selector.is_null() {
AccountsSelector::default()
} else {
let accounts = &accounts_selector["accounts"];
let accounts: Vec<String> = if accounts.is_array() {
accounts
.as_array()
.unwrap()
.iter()
.map(|val| val.as_str().unwrap().to_string())
.collect()
} else {
Vec::default()
};
let owners = &accounts_selector["owners"];
let owners: Vec<String> = if owners.is_array() {
owners
.as_array()
.unwrap()
.iter()
.map(|val| val.as_str().unwrap().to_string())
.collect()
} else {
Vec::default()
};
AccountsSelector::new(&accounts, &owners)
}
}
}
#[no_mangle]
#[allow(improper_ctypes_definitions)]
/// # Safety
///
/// This function returns the Plugin pointer as trait GeyserPlugin.
pub unsafe extern "C" fn _create_plugin() -> *mut dyn GeyserPlugin {
let plugin = Plugin::default();
let plugin: Box<dyn GeyserPlugin> = Box::new(plugin);
Box::into_raw(plugin)
}
#[cfg(test)]
pub(crate) mod tests {
use {super::*, serde_json};
#[test]
fn test_accounts_selector_from_config() {
let config = "{\"accounts_selector\" : { \
\"owners\" : [\"9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin\"] \
}}";
let config: serde_json::Value = serde_json::from_str(config).unwrap();
Plugin::create_accounts_selector_from_config(&config);
}
}

View File

@ -1,3 +0,0 @@
pub mod accounts_selector;
pub mod compression;
pub mod geyser_plugin_grpc;

View File

@ -1,84 +0,0 @@
use rand::Rng;
use tokio::sync::{broadcast, mpsc};
use tonic::transport::Server;
pub mod geyser_proto {
tonic::include_proto!("accountsdb");
}
use geyser_proto::{update::UpdateOneof, SlotUpdate, SubscribeRequest, Update};
pub mod geyser_service {
use super::*;
use {
geyser_proto::accounts_db_server::AccountsDb,
tokio_stream::wrappers::ReceiverStream,
tonic::{Request, Response, Status},
};
#[derive(Debug)]
pub struct Service {
pub sender: broadcast::Sender<Update>,
}
impl Service {
pub fn new() -> Self {
let (tx, _) = broadcast::channel(100);
Self { sender: tx }
}
}
#[tonic::async_trait]
impl AccountsDb for Service {
type SubscribeStream = ReceiverStream<Result<Update, Status>>;
async fn subscribe(
&self,
_request: Request<SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
println!("new client");
let (tx, rx) = mpsc::channel(100);
let mut broadcast_rx = self.sender.subscribe();
tokio::spawn(async move {
loop {
let msg = broadcast_rx.recv().await.unwrap();
tx.send(Ok(msg)).await.unwrap();
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:10000".parse().unwrap();
let service = geyser_service::Service::new();
let sender = service.sender.clone();
let svc = geyser_proto::accounts_db_server::AccountsDbServer::new(service);
tokio::spawn(async move {
let mut slot = 1;
loop {
if sender.receiver_count() > 0 {
println!("sending...");
slot += 1;
let parent = slot - rand::thread_rng().gen_range(1..=2);
sender
.send(Update {
update_oneof: Some(UpdateOneof::SlotUpdate(SlotUpdate {
slot,
parent: Some(parent),
status: rand::thread_rng().gen_range(0..=2),
})),
})
.unwrap();
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
});
Server::builder().add_service(svc).serve(addr).await?;
Ok(())
}

View File

@ -1,13 +0,0 @@
#!/bin/bash
# Write the contents of env variable named by $1
printf '%s\n' "${!1}" > config.toml
echo "$TLS_CA" > ca.pem
echo "$TLS_CLIENT" > client.pem
while true
do
target/release/solana-accountsdb-connector-mango config.toml
sleep 5
done

View File

@ -11,10 +11,10 @@ edition = "2021"
jsonrpc-core = "18.0.0"
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
solana-rpc = "=1.10.35"
solana-client = "=1.10.35"
solana-account-decoder = "=1.10.35"
solana-sdk = "=1.10.35"
solana-rpc = "~1.14.9"
solana-client = "~1.14.9"
solana-account-decoder = "~1.14.9"
solana-sdk = "~1.14.9"
mango-v4 = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
arrayref = "*"
@ -57,7 +57,7 @@ warp = "0.3"
anchor-lang = "0.25.0"
solana-geyser-connector-plugin-grpc = { path = "../geyser-plugin-grpc" }
serum_dex = { git = "ssh://git@github.com/openbook-dex/program", branch = "master" }
[build-dependencies]
tonic-build = { version = "0.6", features = ["compression"] }

View File

@ -6,8 +6,8 @@ edition = "2021"
[dependencies]
solana-geyser-connector-lib = { path = "../lib" }
solana-logger = "=1.10.35"
solana-sdk = "=1.10.35"
solana-logger = "~1.14.9"
solana-sdk = "~1.14.9"
log = "0.4"
anyhow = "1.0"
toml = "0.5"