Move postgres numeric support to separate file

This commit is contained in:
Christian Kamm 2021-11-09 14:32:58 +01:00
parent 3a0e2535f0
commit d889b5ff5b
6 changed files with 230 additions and 223 deletions

3
Cargo.lock generated
View File

@ -4292,6 +4292,8 @@ dependencies = [
"async-stream 0.2.1", "async-stream 0.2.1",
"async-trait", "async-trait",
"bs58 0.3.1", "bs58 0.3.1",
"bytes 1.1.0",
"fixed",
"futures 0.3.17", "futures 0.3.17",
"futures-core", "futures-core",
"futures-util", "futures-util",
@ -4323,7 +4325,6 @@ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"bs58 0.3.1", "bs58 0.3.1",
"bytes 1.1.0",
"fixed", "fixed",
"log 0.4.14", "log 0.4.14",
"mango", "mango",

View File

@ -18,7 +18,6 @@ tokio = { version = "1", features = ["full"] }
tokio-postgres = "0.7.4" tokio-postgres = "0.7.4"
postgres-types = { version = "0.2", features = ["array-impls", "derive"] } postgres-types = { version = "0.2", features = ["array-impls", "derive"] }
postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev = "b4422051c8a31fbba4a35f88004c1cefb1878dd5" } postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev = "b4422051c8a31fbba4a35f88004c1cefb1878dd5" }
bytes = "1.0"
# The v3.2 branch currently has a conflicting syn version dependency, use a fixed version # The v3.2 branch currently has a conflicting syn version dependency, use a fixed version
mango = { git = "https://github.com/ckamm/mango-v3", rev = "2e1e4886544bdb1c5f773a3856305210f622fa94" } mango = { git = "https://github.com/ckamm/mango-v3", rev = "2e1e4886544bdb1c5f773a3856305210f622fa94" }

View File

@ -1,230 +1,12 @@
use { use {
async_trait::async_trait, async_trait::async_trait,
bytes::{BufMut, BytesMut},
fixed::types::I80F48,
mango::state::{DataType, MangoAccount, MangoCache, MangoGroup}, mango::state::{DataType, MangoAccount, MangoCache, MangoGroup},
mango_common::Loadable, mango_common::Loadable,
postgres_types::{IsNull, ToSql, Type}, postgres_types::ToSql,
std::{cmp, error, mem}, std::mem,
}; };
use crate::{encode_address, AccountTable, AccountWrite}; use crate::{encode_address, postgres_types_numeric::*, AccountTable, AccountWrite};
#[derive(Debug, Clone)]
pub struct SqlNumericI80F48(pub I80F48);
impl ToSql for SqlNumericI80F48 {
fn to_sql(
&self,
_: &postgres_types::Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn error::Error + 'static + Sync + Send>> {
if self.0 == 0 {
out.reserve(10);
out.put_u16(1); // num groups
out.put_i16(0); // first group weight
out.put_u16(0); // sign
out.put_u16(0); // dscale
out.put_i16(0); // first group
return Ok(IsNull::No);
}
let abs_val = self.0.abs();
let decimals = abs_val.int_log10();
let first_group_weight = ((decimals as f64) / 4.0f64).floor() as i16;
let last_group_weight = -4i16;
let num_groups = (first_group_weight - last_group_weight + 1) as usize;
// Reserve bytes
out.reserve(8 + num_groups * 2);
// Number of groups
out.put_u16(num_groups as u16);
// Weight of first group
out.put_i16(first_group_weight);
// Sign
out.put_u16(if self.0 < 0 { 0x4000 } else { 0x0000 });
// DScale
out.put_u16(16);
let mut int_part = abs_val.int().to_num::<u128>();
let mut frac_part = (abs_val.frac() * I80F48::from_num(1e16)).to_num::<u64>();
//info!("i80f48 {} {} {} {} {}", self.0, decimals, first_group_weight, int_part, frac_part);
for weight in (0..=first_group_weight).rev() {
let decimal_shift = 10000u128.pow(weight as u32);
let v = (int_part / decimal_shift) & 0xFFFF;
out.put_i16(v as i16);
//info!("int {} {} {}", weight, v, int_part);
int_part -= v * decimal_shift;
}
for weight in (last_group_weight..=cmp::min(first_group_weight, -1)).rev() {
let decimal_shift = 10000u64.pow((4 + weight) as u32);
let v = (frac_part / decimal_shift) & 0xFFFF;
out.put_i16(v as i16);
//info!("frac {} {} {}", weight, v, frac_part);
frac_part -= v * decimal_shift;
}
Ok(IsNull::No)
}
fn accepts(ty: &Type) -> bool {
matches!(*ty, Type::NUMERIC)
}
postgres_types::to_sql_checked!();
}
// from https://github.com/rust-lang/rust/pull/86930
mod int_log {
// 0 < val < 100_000_000
const fn less_than_8(mut val: u32) -> u32 {
let mut log = 0;
if val >= 10_000 {
val /= 10_000;
log += 4;
}
log + if val >= 1000 {
3
} else if val >= 100 {
2
} else if val >= 10 {
1
} else {
0
}
}
// 0 < val < 10_000_000_000_000_000
const fn less_than_16(mut val: u64) -> u32 {
let mut log = 0;
if val >= 100_000_000 {
val /= 100_000_000;
log += 8;
}
log + less_than_8(val as u32)
}
// 0 < val <= u64::MAX
pub const fn u64(mut val: u64) -> u32 {
let mut log = 0;
if val >= 10_000_000_000_000_000 {
val /= 10_000_000_000_000_000;
log += 16;
}
log + less_than_16(val)
}
// 0 < val <= u128::MAX
pub const fn u128(mut val: u128) -> u32 {
let mut log = 0;
if val >= 100_000_000_000_000_000_000_000_000_000_000 {
val /= 100_000_000_000_000_000_000_000_000_000_000;
log += 32;
return log + less_than_8(val as u32);
}
if val >= 10_000_000_000_000_000 {
val /= 10_000_000_000_000_000;
log += 16;
}
log + less_than_16(val as u64)
}
}
#[derive(Debug, Clone)]
pub struct SqlNumericI128(pub i128);
impl ToSql for SqlNumericI128 {
fn to_sql(
&self,
_: &postgres_types::Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn error::Error + 'static + Sync + Send>> {
let abs_val = self.0.abs() as u128;
let decimals = if self.0 != 0 {
int_log::u128(abs_val)
} else {
0
};
let first_group_weight = ((decimals as f64) / 4.0f64).floor() as i16;
let num_groups = (first_group_weight + 1) as usize;
// Reserve bytes
out.reserve(8 + num_groups * 2);
// Number of groups
out.put_u16(num_groups as u16);
// Weight of first group
out.put_i16(first_group_weight);
// Sign
out.put_u16(if self.0 < 0 { 0x4000 } else { 0x0000 });
// DScale
out.put_u16(0);
let mut int_part = abs_val;
for weight in (0..=first_group_weight).rev() {
let decimal_shift = 10000u128.pow(weight as u32);
let v = (int_part / decimal_shift) & 0xFFFF;
out.put_i16(v as i16);
int_part -= v * decimal_shift;
}
Ok(IsNull::No)
}
fn accepts(ty: &Type) -> bool {
matches!(*ty, Type::NUMERIC)
}
postgres_types::to_sql_checked!();
}
#[derive(Debug, Clone)]
pub struct SqlNumericU64(pub u64);
impl ToSql for SqlNumericU64 {
fn to_sql(
&self,
_: &postgres_types::Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn error::Error + 'static + Sync + Send>> {
let decimals = if self.0 != 0 { int_log::u64(self.0) } else { 0 };
let first_group_weight = ((decimals as f64) / 4.0f64).floor() as i16;
let num_groups = (first_group_weight + 1) as usize;
// Reserve bytes
out.reserve(8 + num_groups * 2);
// Number of groups
out.put_u16(num_groups as u16);
// Weight of first group
out.put_i16(first_group_weight);
// Sign
out.put_u16(0);
// DScale
out.put_u16(0);
let mut int_part = self.0;
for weight in (0..=first_group_weight).rev() {
let decimal_shift = 10000u64.pow(weight as u32);
let v = (int_part / decimal_shift) & 0xFFFF;
out.put_i16(v as i16);
int_part -= v * decimal_shift;
}
Ok(IsNull::No)
}
fn accepts(ty: &Type) -> bool {
matches!(*ty, Type::NUMERIC)
}
postgres_types::to_sql_checked!();
}
#[derive(Debug, ToSql)] #[derive(Debug, ToSql)]
struct PerpAccount { struct PerpAccount {

View File

@ -35,6 +35,8 @@ bs58 = "0.3.1"
log = "0.4" log = "0.4"
rand = "0.7" rand = "0.7"
anyhow = "1.0" anyhow = "1.0"
fixed = { version = "=1.9.0", features = ["serde"] }
bytes = "1.0"
futures = "0.3.17" futures = "0.3.17"
futures-core = "0.3" futures-core = "0.3"

View File

@ -1,6 +1,7 @@
pub mod grpc_plugin_source; pub mod grpc_plugin_source;
pub mod metrics; pub mod metrics;
pub mod postgres_target; pub mod postgres_target;
pub mod postgres_types_numeric;
pub mod websocket_source; pub mod websocket_source;
use { use {

View File

@ -0,0 +1,222 @@
use {
bytes::{BufMut, BytesMut},
fixed::types::I80F48,
postgres_types::{IsNull, ToSql, Type},
std::{cmp, error},
};
#[derive(Debug, Clone)]
pub struct SqlNumericI80F48(pub I80F48);
impl ToSql for SqlNumericI80F48 {
fn to_sql(
&self,
_: &postgres_types::Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn error::Error + 'static + Sync + Send>> {
if self.0 == 0 {
out.reserve(10);
out.put_u16(1); // num groups
out.put_i16(0); // first group weight
out.put_u16(0); // sign
out.put_u16(0); // dscale
out.put_i16(0); // first group
return Ok(IsNull::No);
}
let abs_val = self.0.abs();
let decimals = abs_val.int_log10();
let first_group_weight = ((decimals as f64) / 4.0f64).floor() as i16;
let last_group_weight = -4i16;
let num_groups = (first_group_weight - last_group_weight + 1) as usize;
// Reserve bytes
out.reserve(8 + num_groups * 2);
// Number of groups
out.put_u16(num_groups as u16);
// Weight of first group
out.put_i16(first_group_weight);
// Sign
out.put_u16(if self.0 < 0 { 0x4000 } else { 0x0000 });
// DScale
out.put_u16(16);
let mut int_part = abs_val.int().to_num::<u128>();
let mut frac_part = (abs_val.frac() * I80F48::from_num(1e16)).to_num::<u64>();
//info!("i80f48 {} {} {} {} {}", self.0, decimals, first_group_weight, int_part, frac_part);
for weight in (0..=first_group_weight).rev() {
let decimal_shift = 10000u128.pow(weight as u32);
let v = (int_part / decimal_shift) & 0xFFFF;
out.put_i16(v as i16);
//info!("int {} {} {}", weight, v, int_part);
int_part -= v * decimal_shift;
}
for weight in (last_group_weight..=cmp::min(first_group_weight, -1)).rev() {
let decimal_shift = 10000u64.pow((4 + weight) as u32);
let v = (frac_part / decimal_shift) & 0xFFFF;
out.put_i16(v as i16);
//info!("frac {} {} {}", weight, v, frac_part);
frac_part -= v * decimal_shift;
}
Ok(IsNull::No)
}
fn accepts(ty: &Type) -> bool {
matches!(*ty, Type::NUMERIC)
}
postgres_types::to_sql_checked!();
}
// from https://github.com/rust-lang/rust/pull/86930
mod int_log {
// 0 < val < 100_000_000
const fn less_than_8(mut val: u32) -> u32 {
let mut log = 0;
if val >= 10_000 {
val /= 10_000;
log += 4;
}
log + if val >= 1000 {
3
} else if val >= 100 {
2
} else if val >= 10 {
1
} else {
0
}
}
// 0 < val < 10_000_000_000_000_000
const fn less_than_16(mut val: u64) -> u32 {
let mut log = 0;
if val >= 100_000_000 {
val /= 100_000_000;
log += 8;
}
log + less_than_8(val as u32)
}
// 0 < val <= u64::MAX
pub const fn u64(mut val: u64) -> u32 {
let mut log = 0;
if val >= 10_000_000_000_000_000 {
val /= 10_000_000_000_000_000;
log += 16;
}
log + less_than_16(val)
}
// 0 < val <= u128::MAX
pub const fn u128(mut val: u128) -> u32 {
let mut log = 0;
if val >= 100_000_000_000_000_000_000_000_000_000_000 {
val /= 100_000_000_000_000_000_000_000_000_000_000;
log += 32;
return log + less_than_8(val as u32);
}
if val >= 10_000_000_000_000_000 {
val /= 10_000_000_000_000_000;
log += 16;
}
log + less_than_16(val as u64)
}
}
#[derive(Debug, Clone)]
pub struct SqlNumericI128(pub i128);
impl ToSql for SqlNumericI128 {
fn to_sql(
&self,
_: &postgres_types::Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn error::Error + 'static + Sync + Send>> {
let abs_val = self.0.abs() as u128;
let decimals = if self.0 != 0 {
int_log::u128(abs_val)
} else {
0
};
let first_group_weight = ((decimals as f64) / 4.0f64).floor() as i16;
let num_groups = (first_group_weight + 1) as usize;
// Reserve bytes
out.reserve(8 + num_groups * 2);
// Number of groups
out.put_u16(num_groups as u16);
// Weight of first group
out.put_i16(first_group_weight);
// Sign
out.put_u16(if self.0 < 0 { 0x4000 } else { 0x0000 });
// DScale
out.put_u16(0);
let mut int_part = abs_val;
for weight in (0..=first_group_weight).rev() {
let decimal_shift = 10000u128.pow(weight as u32);
let v = (int_part / decimal_shift) & 0xFFFF;
out.put_i16(v as i16);
int_part -= v * decimal_shift;
}
Ok(IsNull::No)
}
fn accepts(ty: &Type) -> bool {
matches!(*ty, Type::NUMERIC)
}
postgres_types::to_sql_checked!();
}
#[derive(Debug, Clone)]
pub struct SqlNumericU64(pub u64);
impl ToSql for SqlNumericU64 {
fn to_sql(
&self,
_: &postgres_types::Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn error::Error + 'static + Sync + Send>> {
let decimals = if self.0 != 0 { int_log::u64(self.0) } else { 0 };
let first_group_weight = ((decimals as f64) / 4.0f64).floor() as i16;
let num_groups = (first_group_weight + 1) as usize;
// Reserve bytes
out.reserve(8 + num_groups * 2);
// Number of groups
out.put_u16(num_groups as u16);
// Weight of first group
out.put_i16(first_group_weight);
// Sign
out.put_u16(0);
// DScale
out.put_u16(0);
let mut int_part = self.0;
for weight in (0..=first_group_weight).rev() {
let decimal_shift = 10000u64.pow(weight as u32);
let v = (int_part / decimal_shift) & 0xFFFF;
out.put_i16(v as i16);
int_part -= v * decimal_shift;
}
Ok(IsNull::No)
}
fn accepts(ty: &Type) -> bool {
matches!(*ty, Type::NUMERIC)
}
postgres_types::to_sql_checked!();
}