From ba165df268ec2d5ac49a7e5a2118b3adeef13e09 Mon Sep 17 00:00:00 2001 From: Nathaniel Parke Date: Wed, 11 Nov 2020 16:51:27 +0800 Subject: [PATCH] Order placement --- src/config.ts | 1 + src/exchange/api.ts | 592 +++++++++++++++++++++++++++++++++++++++++- src/exchange/types.ts | 14 +- src/exchange/utils.ts | 12 + src/utils.ts | 28 ++ 5 files changed, 640 insertions(+), 7 deletions(-) diff --git a/src/config.ts b/src/config.ts index 07a03b8..41fcbc6 100644 --- a/src/config.ts +++ b/src/config.ts @@ -17,6 +17,7 @@ export const RESTART_INTERVAL_SEC = parseInt( export const HARD_CODED_MINTS = process.env.HARD_CODED_MINTS || {}; export const DEFAULT_TIMEOUT = 15000; +export const BLOCKHASH_CACHE_TIME = 30; export const NUM_CONNECTIONS = 1; export const SOLANA_URL = process.env.SOLANA_URL || "http://validator-lb.wirelesstable.net"; diff --git a/src/exchange/api.ts b/src/exchange/api.ts index 0f77356..b434bb9 100644 --- a/src/exchange/api.ts +++ b/src/exchange/api.ts @@ -1,10 +1,32 @@ -import { Account, Connection, PublicKey } from "@solana/web3.js"; -import { Coin, Exchange, MarketInfo, Pair } from "./types"; +import { + Account, + AccountInfo, Blockhash, + Connection, + Context, + PublicKey, Transaction, +} from "@solana/web3.js"; +import { + Coin, + Dir, + Exchange, + L2OrderBook, + MarketInfo, OrderType, + Pair, + RawTrade, + TimestampedL2Levels, TokenAccountInfo, + Trade, +} from "./types"; import * as config from "../config"; -import { COIN_MINTS, EXCHANGE_ENABLED_MARKETS } from "./config"; -import { getKeys } from "../utils"; +import {COIN_MINTS, EXCHANGE_ENABLED_MARKETS, MINT_COINS} from "./config"; +import {DirUtil, getKeys, getUnixTs, logger, sleep} from "../utils"; import assert from "assert"; -import { Market } from "@project-serum/serum"; +import {Market, OpenOrders, Orderbook, TokenInstructions} from "@project-serum/serum"; +import { Buffer } from "buffer"; +import BN from "bn.js"; +import {makeClientOrderId, parseTokenAccountData} from "./utils"; +import {OrderParams} from "@project-serum/serum/lib/market"; +import {BLOCKHASH_CACHE_TIME, DEFAULT_TIMEOUT} from "../config"; +import {signAndSerializeTransaction} from "./solana"; export class SerumApi { static readonly exchange: Exchange = "serum"; @@ -15,11 +37,23 @@ export class SerumApi { readonly addressMarkets: { [address: string]: Market }; readonly marketAddresses: { [market: string]: PublicKey }; readonly addressProgramIds: { [address: string]: PublicKey }; + private _loadedMarkets: { [address: string]: Market }; private _connections: Connection[]; private _publicKey: PublicKey; private _privateKey: Array; private _account: Account; private _wsConnection: Connection; + private _wsOrderbooks: { + [market: string]: { buy: TimestampedL2Levels; sell: TimestampedL2Levels }; + }; + private _wsOrderbooksConnected: string[]; + protected _tokenAccountsCache: { + [coin: string]: { accounts: TokenAccountInfo[]; ts: number }; + }; + protected _blockhashCache: { + blockhash: Blockhash; + fetchedAt: number; + }; constructor( exchange: Exchange, @@ -35,6 +69,11 @@ export class SerumApi { this._privateKey = getKeys([`${this.exchange}_private_key`])[0]; this._account = new Account(this._privateKey); this._publicKey = this._account.publicKey; + this._loadedMarkets = {}; + this._wsOrderbooks = {}; + this._wsOrderbooksConnected = []; + this._tokenAccountsCache = {}; + this._blockhashCache = { blockhash: "", fetchedAt: 0 }; this.marketInfo = marketInfo; this.markets = markets; this.marketAddresses = marketAddresses; @@ -99,6 +138,12 @@ export class SerumApi { ); } + get _connection(): Connection { + return this._connections[ + Math.floor(Math.random() * this._connections.length) + ]; + } + static async getMarketInfo( connection: Connection, coin: Coin, @@ -140,7 +185,9 @@ export class SerumApi { ]; } - async getMarketInfo(): Promise<{ [k: string]: {[prop: string]: string | number}}> { + async getMarketInfo(): Promise<{ + [k: string]: { [prop: string]: string | number }; + }> { return Object.fromEntries( Object.entries(this.marketInfo).map(([market, info]) => [ market, @@ -156,4 +203,537 @@ export class SerumApi { ]) ); } + + async getMarketFromAddress(address: string | PublicKey): Promise { + const stringAddress = + typeof address === "string" ? address : address.toBase58(); + if (stringAddress in this._loadedMarkets) { + return this._loadedMarkets[stringAddress]; + } + const pubKeyAddress = + typeof address === "string" ? new PublicKey(address) : address; + const market = await Market.load( + this._connection, + pubKeyAddress, + {}, + this.addressProgramIds[stringAddress] + ); + this._loadedMarkets[stringAddress] = market; + return market; + } + + private getMarketAddress(coin: Coin, priceCurrency: Coin): PublicKey { + return this.marketInfo[Pair.key(coin, priceCurrency)].address; + } + + async getTrades(coin?: Coin, priceCurrency?: Coin): Promise { + if (coin && priceCurrency) { + const market = await this.getMarketFromAddress( + this.getMarketAddress(coin, priceCurrency) + ); + const rawTrades = await market.loadFills(this._connection); + const ourTrades = rawTrades.filter( + (element) => !element.eventFlags.maker + ); + return this.parseRawTrades(ourTrades, coin, priceCurrency); + } + return Promise.all( + this.markets.map((market) => + this.getTrades(market.coin, market.priceCurrency) + ) + ).then((trades) => + trades.reduce((acc, curr) => { + return [...acc, ...curr]; + }) + ); + } + + parseRawTrades( + rawTrades: RawTrade[], + coin: Coin, + priceCurrency: Coin + ): Trade[] { + const parseTrade = (rawTrade: RawTrade): Trade => { + // Add ms timestamp to fill id for uniqueness + const timeSec = getUnixTs(); + const timeMs = Math.floor(timeSec * 1000); + return { + exchange: this.exchange, + coin: coin, + priceCurrency: priceCurrency, + id: `${rawTrade.orderId.toString()}|${rawTrade.size}|${timeMs}`, + orderId: rawTrade.orderId.toString(), + price: rawTrade.price, + quantity: rawTrade.size, + time: timeSec, + side: DirUtil.parse(rawTrade.side), + info: { + ...rawTrade.eventFlags, + openOrdersSlot: rawTrade.openOrdersSlot, + quantityReleased: rawTrade.nativeQuantityReleased.toString(), + quantityPaid: rawTrade.nativeQuantityPaid.toString(), + openOrders: rawTrade.openOrders.toBase58(), + }, + }; + }; + return rawTrades.map((trade) => parseTrade(trade)); + } + + async getRestOrderBook(coin: Coin, priceCurrency: Coin): Promise { + const validAt = getUnixTs(); + const marketAddress: PublicKey = this.getMarketAddress(coin, priceCurrency); + const market = await this.getMarketFromAddress(marketAddress); + const bidsPromise = market.loadBids(this._connection); + const asksPromise = market.loadAsks(this._connection); + const rawBids = await bidsPromise; + const rawAsks = await asksPromise; + const receivedAt = getUnixTs(); + return { + bids: Object.values(this.parseRawOrderBook(rawBids)), + asks: Object.values(this.parseRawOrderBook(rawAsks)), + market: new Pair(coin, priceCurrency), + validAt, + receivedAt, + }; + } + + async getWsOrderBook(coin: Coin, priceCurrency: Coin): Promise { + const market = new Pair(coin, priceCurrency); + const validAt = getUnixTs(); + await this.subscribeToOrderBookUpdates(market); + const bids = this._wsOrderbooks[market.key()][DirUtil.buySell(Dir.B)]; + const asks = this._wsOrderbooks[market.key()][DirUtil.buySell(Dir.S)]; + const receivedAt = Math.min(bids.receivedAt, asks.receivedAt); + return { + bids: bids.orderbook, + asks: asks.orderbook, + market: new Pair(coin, priceCurrency), + validAt, + receivedAt, + }; + } + + parseRawOrderBook(rawOrders: Orderbook): [number, number][] { + const orders: [number, number][] = []; + for (const [price, size] of rawOrders.getL2(100)) { + orders.push([price, size]); + } + return orders; + } + + async subscribeToOrderBookUpdates(market: Pair): Promise { + if (this._wsOrderbooksConnected.includes(market.key())) { + return; + } + const serumMarket = await this.getMarketFromAddress( + this.getMarketAddress(market.coin, market.priceCurrency) + ); + const updateCallback = (side) => ( + accountInfoUpdate: AccountInfo, + context: Context + ) => { + this._wsOrderbooks[market.key()][DirUtil.buySell(side)] = { + orderbook: this.parseRawOrderBook( + Orderbook.decode(serumMarket, accountInfoUpdate.data) + ), + receivedAt: getUnixTs(), + }; + }; + const [bids, asks] = await Promise.all([ + serumMarket.loadBids(this._connection), + serumMarket.loadAsks(this._connection), + ]); + this._wsOrderbooks[market.key()] = { + buy: { orderbook: this.parseRawOrderBook(bids), receivedAt: getUnixTs() }, + sell: { + orderbook: this.parseRawOrderBook(asks), + receivedAt: getUnixTs(), + }, + }; + this._wsConnection.onAccountChange( + serumMarket.bidsAddress, + updateCallback(Dir.B) + ); + this._wsConnection.onAccountChange( + serumMarket.asksAddress, + updateCallback(Dir.S) + ); + if (this._wsOrderbooksConnected.length == 0) { + this._wsConnection.onSlotChange((slotInfo) => {}); + } + this._wsOrderbooksConnected.push(market.key()); + } + + async awaitTransactionSignatureConfirmation( + txid: string, + timeout: number = DEFAULT_TIMEOUT + ): Promise { + let done = false; + const result: string = await new Promise((resolve, reject) => { + (async () => { + setTimeout(() => { + if (done) { + return; + } + done = true; + const message = `awaitTransactionSignature timed out waiting for signature confirmation:\ntxid ${txid}`; + logger.info(message); + reject(message); + }, timeout); + try { + this._connection.onSignature(txid, (result, context) => { + logger.info( + `awaitTransactionSignature signature confirmed via callback:\ntxid ${txid}\nresult ${JSON.stringify( + result + )}` + ); + done = true; + if (result.err) { + reject(result.err); + } else { + resolve(txid); + } + }); + } catch (e) { + done = true; + logger.info( + `awaitTransactionSignature encountered error setting up solana onSignature callback:\ntxid ${txid}\n${JSON.stringify( + result + )}` + ); + reject(e); + } + while (!done) { + (async () => { + try { + const startTime = getUnixTs(); + const signatureStatus = await this._connection.getSignatureStatuses( + [txid] + ); + logger.debug( + `getSignatureStatuses took ${getUnixTs() - startTime} seconds` + ); + const result = signatureStatus && signatureStatus.value[0]; + if (!done) { + if (!result) { + // received null result + return; + } else if (result.err) { + logger.log( + "debug", + `awaitTransactionSignature received error:\ntxid ${txid}\n${JSON.stringify( + result.err + )}` + ); + done = true; + reject(JSON.stringify(result.err)); + } else if (!result.confirmations) { + // received update with no confirmations + return; + } else { + logger.log( + "debug", + `awaitTransactionSignature received confirmation:\ntxid ${txid}\n${JSON.stringify( + result + )}` + ); + done = true; + resolve(result?.toString()); + } + } + } catch (e) { + if (!done) { + logger.info( + `awaitTransactionsSignature encountered error:\ntxid ${txid}\n${JSON.stringify( + e + )}` + ); + done = true; + reject(e); + } + } + })(); + await sleep(1000); + } + })(); + }); + done = true; + return result; + } + + async sendTransaction( + transaction: Transaction, + signers: Account[], + transactionSignatureTimeout: number = DEFAULT_TIMEOUT, + onError?: (err) => void, + ): Promise { + const blockhash = await this.getCachedBlockhash(); + const rawTransaction = await signAndSerializeTransaction( + this._connection, + transaction, + signers, + blockhash + ); + let done = false; + const startTime = getUnixTs(); + let retries = 0; + const txid = await this._connection.sendRawTransaction(rawTransaction, { + skipPreflight: true, + }); + logger.info(`Started sending transaction for: ${txid}`); + const awaitSignaturePromise = this.awaitTransactionSignatureConfirmation( + txid, + transactionSignatureTimeout + ) + .then((res) => { + done = true; + }) + .catch((e) => { + done = true; + if (onError) { + onError(e); + } else { + logger.info( + `transaction failed with error:\ntxid ${txid}\nerror ${e}` + ); + } + throw e; + }); + while (!done && getUnixTs() - startTime < DEFAULT_TIMEOUT) { + await sleep(5000); + if (retries < 2) { + this._connection.sendRawTransaction(rawTransaction, { + skipPreflight: true, + }); + retries += 1; + } + } + await awaitSignaturePromise; + return txid; + } + + async getCachedBlockhash(): Promise { + const updateBlockhashCache = async () => { + const now = getUnixTs(); + await this._connection.getRecentBlockhash().then((res) => { + this._blockhashCache = { + blockhash: res.blockhash, + fetchedAt: now, + }; + }); + }; + if (getUnixTs() - this._blockhashCache.fetchedAt > BLOCKHASH_CACHE_TIME) { + await updateBlockhashCache(); + } else if ( + getUnixTs() - this._blockhashCache.fetchedAt > + BLOCKHASH_CACHE_TIME / 2 + ) { + updateBlockhashCache(); + } + return this._blockhashCache.blockhash; + } + + async placeOrder( + side: Dir, + coin: Coin, + priceCurrency: Coin, + quantity: number, + price: number, + orderType: OrderType = OrderType.limit, + options: {[k: string]: unknown} = {} + ): Promise { + const clientId = + typeof options.clientId === "string" || + typeof options.clientId === "number" + ? new BN(options.clientId) + : makeClientOrderId(); + const { transaction, signers } = await this.makeOrderTransaction( + clientId, + side, + coin, + priceCurrency, + quantity, + price, + orderType, + options + ); + const onError = (e) => { + logger.info( + `placeOrder encountered error when creating transaction:\norderId ${clientId}\nerror ${e}` + ); + }; + const txid = await this.sendTransaction( + transaction, + signers, + 5000, + onError + ); + logger.info( + `makeOrder completed transaction for:\n${clientId.toString()}\n${txid}` + ); + return clientId.toString(); + } + + async makeOrderTransaction( + clientId: BN, + side: Dir, + coin: Coin, + priceCurrency: Coin, + quantity: number, + price: number, + orderType: OrderType = OrderType.limit, + options: {[k: string]: unknown} = {} + ): Promise<{ transaction: Transaction; signers: Account[] }> { + logger.info( + `Order parameters: ${side}, ${coin}, ${priceCurrency}, ${quantity}, ${price}, ${orderType}` + ); + const owner = new Account(this._privateKey); + let payer; + if (coin === "SOL" && side === Dir.S) { + payer = this._publicKey; + } else if (side === Dir.S) { + payer = (await this.getTokenAccounts(coin, 600))[0].pubkey; + } else { + payer = (await this.getTokenAccounts(priceCurrency, 600))[0].pubkey; + } + + const [market, openOrdersAccount] = await Promise.all([ + this.getMarketFromAddress( + this.getMarketAddress(coin, priceCurrency) + ), + this._getOpenOrdersAccountToUse(coin, priceCurrency), + ]); + + const params: OrderParams = { + owner, + payer, + side: DirUtil.buySell(side), + price, + size: quantity, + orderType, + clientId, + openOrdersAddressKey: openOrdersAccount, + }; + const transaction = new Transaction(); + transaction.add(market.makeMatchOrdersTransaction(15)); + const { + transaction: placeOrderTransaction, + signers, + } = await market.makePlaceOrderTransaction( + this._connection, + params, + 600000 + ); + transaction.add(placeOrderTransaction); + transaction.add(market.makeMatchOrdersTransaction(15)); + return { + transaction, + signers, + }; + } + + async _getOpenOrdersAccountToUse( + coin: Coin, + priceCurrency: Coin + ): Promise { + let accountsForMarket = await this.getOpenOrdersAccountsForMarket( + coin, + priceCurrency + ); + if (accountsForMarket.length === 0) { + // try again without caching in case an account was recently created + accountsForMarket = await this.getOpenOrdersAccountsForMarket( + coin, + priceCurrency, + 0 + ); + } + if (accountsForMarket.length === 0) { + const serumMarket = await this.getMarketFromAddress( + this.getMarketAddress(coin, priceCurrency) + ); + const newOpenOrdersAccount = new Account(); + await OpenOrders.makeCreateAccountTransaction( + this._connection, + serumMarket.address, + this._publicKey, + newOpenOrdersAccount.publicKey, + this.marketInfo[new Pair(coin, priceCurrency).key()].programId + ); + return newOpenOrdersAccount.publicKey; + } + return accountsForMarket.sort(this.compareOpenOrdersAccounts)[0].publicKey; + } + + compareOpenOrdersAccounts(a: OpenOrders, b: OpenOrders): number { + const aAddress = a.address.toBase58(); + const bAddress = b.address.toBase58(); + if (aAddress < bAddress) { + return -1; + } else if (aAddress === bAddress) { + return 0; + } else { + return 1; + } + } + + async getOpenOrdersAccountsForMarket( + coin: Coin, + priceCurrency: Coin, + cacheDurationSec = 60 + ): Promise { + const market = await this.getMarketFromAddress( + this.getMarketAddress(coin, priceCurrency) + ); + return market.findOpenOrdersAccountsForOwner( + this._connection, + this._publicKey, + cacheDurationSec * 1000 + ); + } + + async getTokenAccounts( + coin?: Coin, + cacheDurationSecs = 0 + ): Promise { + const now = getUnixTs(); + if ( + coin && + coin in this._tokenAccountsCache && + now - this._tokenAccountsCache[coin].ts < cacheDurationSecs + ) { + return this._tokenAccountsCache[coin].accounts; + } + const tokenAccounts = await this._connection.getTokenAccountsByOwner( + this._publicKey, + { + programId: TokenInstructions.TOKEN_PROGRAM_ID, + } + ); + + const cache: { + [coin: string]: { accounts: TokenAccountInfo[]; ts: number }; + } = {}; + for (const account of tokenAccounts.value) { + const parsedTokenAccount = { + pubkey: account.pubkey, + ...parseTokenAccountData(account.account.data), + }; + const coin = MINT_COINS[parsedTokenAccount.mint.toBase58()]; + if (!coin) { + continue; + } + if (!(coin in cache)) { + cache[coin] = { accounts: [], ts: now }; + } + cache[coin].accounts.push(parsedTokenAccount); + } + this._tokenAccountsCache = cache; + if (!coin) { + return Object.values(cache) + .map((a) => a.accounts) + .reduce((a, c) => [...a, ...c]); + } + return cache[coin].accounts; + } } diff --git a/src/exchange/types.ts b/src/exchange/types.ts index c594828..7d6aa93 100644 --- a/src/exchange/types.ts +++ b/src/exchange/types.ts @@ -128,7 +128,7 @@ export interface MarketInfo { [propName: string]: unknown; } -export interface SerumFill { +export interface RawTrade { size: number; price: number; side: string; @@ -213,3 +213,15 @@ export class SerumOrder { }; } } + +export interface TimestampedL2Levels { + orderbook: [number, number][]; + receivedAt: number; +} + +export type TokenAccountInfo = { + pubkey: PublicKey; + mint: PublicKey; + owner: PublicKey; + amount: number; +}; diff --git a/src/exchange/utils.ts b/src/exchange/utils.ts index 0b63c63..984b7a2 100644 --- a/src/exchange/utils.ts +++ b/src/exchange/utils.ts @@ -7,6 +7,7 @@ import { } from "@solana/web3.js"; import BufferLayout from "buffer-layout"; import { TokenInstructions } from "@project-serum/serum"; +import BN from "bn.js"; export const ACCOUNT_LAYOUT = BufferLayout.struct([ BufferLayout.blob(32, "mint"), @@ -79,3 +80,14 @@ export async function createAndInitializeTokenAccount({ const signers = [payer, newAccount]; return await connection.sendTransaction(transaction, signers); } + +export function makeClientOrderId(bits = 64): BN { + let binaryString = "1"; + for (let i = 1; i < bits; i++) { + binaryString += Math.max( + Math.min(Math.floor(Math.random() * 2), 1), + 0 + ).toString(); + } + return new BN(binaryString, 2); +} diff --git a/src/utils.ts b/src/utils.ts index e237b8d..59e6d18 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -5,6 +5,7 @@ import winston, { format } from "winston"; import "winston-daily-rotate-file"; const { combine, timestamp, printf } = format; import fs from "fs"; +import { Dir } from "./exchange/types"; // Logging if ( LOGGING_DIR && @@ -94,3 +95,30 @@ export function divideBnToNumber(numerator: BN, denominator: BN): number { const gcd = rem.gcd(denominator); return quotient + rem.div(gcd).toNumber() / denominator.div(gcd).toNumber(); } + +export class DirUtil { + public static buySell = (dir: Dir): "buy" | "sell" => { + return dir === 1 ? "buy" : "sell"; + }; + + public static parse = (raw: string | bigint | Dir): Dir => { + if (raw === Dir.B) { + return Dir.B; + } else if (raw === Dir.S) { + return Dir.S; + } else if ( + typeof raw === "string" && + ["bid", "buy", "b", "create", "long"].includes(raw.toLowerCase()) + ) { + return Dir.B; + } else if ( + typeof raw === "string" && + ["ask", "sell", "sale", "a", "s", "redeem", "short"].includes( + raw.toLowerCase() + ) + ) { + return Dir.S; + } + throw TypeError(`Cannot parse Dir from ${raw}`); + }; +}