From 5de95fa8bbfbd1de9128377d16507e020b82f58e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hern=C3=A1n=20Di=20Pietro?= Date: Fri, 8 Oct 2021 16:03:37 -0300 Subject: [PATCH] Service booting and feeding :happy: --- backend/IPriceFetcher.ts | 6 +- backend/IPublisher.ts | 13 +++- backend/PythPriceFetcher.ts | 12 ++-- backend/main.ts | 93 +++++++++++++++++++++------ backend/publisher/StdAlgoPublisher.ts | 51 +++++++++++---- backend/sleep.ts | 5 ++ backend/statusCodes.ts | 13 ++++ backend/strategy/strategy.ts | 7 +- backend/strategy/strategyBase.ts | 32 +++++++-- backend/strategy/strategyBaseQueue.ts | 36 ----------- backend/strategy/strategyLastPrice.ts | 14 ++-- package-lock.json | 17 +++++ package.json | 1 + settings.js | 21 +++--- test/backend-test.js | 0 15 files changed, 216 insertions(+), 105 deletions(-) create mode 100644 backend/sleep.ts create mode 100644 backend/statusCodes.ts delete mode 100644 backend/strategy/strategyBaseQueue.ts create mode 100644 test/backend-test.js diff --git a/backend/IPriceFetcher.ts b/backend/IPriceFetcher.ts index 477c8e454..7ac650392 100644 --- a/backend/IPriceFetcher.ts +++ b/backend/IPriceFetcher.ts @@ -7,19 +7,21 @@ */ import { PriceTicker } from './PriceTicker' +import { IStrategy } from './strategy/strategy' export interface IPriceFetcher { start(): void stop(): void + hasData(): boolean /** * Set price aggregation strategy for this fetcher. * @param IStrategy The local price aggregation strategy */ - setStrategy(IStrategy) + setStrategy(s: IStrategy) /** * Get the current price, according to running strategy. */ - queryTicker(): PriceTicker + queryTicker(): PriceTicker | undefined } diff --git a/backend/IPublisher.ts b/backend/IPublisher.ts index 3838bd9ce..81ec52530 100644 --- a/backend/IPublisher.ts +++ b/backend/IPublisher.ts @@ -1,3 +1,4 @@ +/* eslint-disable no-unused-vars */ /** * Pricecaster Service. * @@ -7,12 +8,18 @@ */ import { PriceTicker } from './PriceTicker' +import { StatusCode } from './statusCodes' -export class PublishInfo { - block: BigInt = BigInt(0) - txid: string = '' +export type PublishInfo = { + status: StatusCode, + reason?: '', + msgb64?: '', + block?: BigInt + txid?: string } export interface IPublisher { + start(): void + stop(): void publish(tick: PriceTicker): Promise } diff --git a/backend/PythPriceFetcher.ts b/backend/PythPriceFetcher.ts index cd792243f..26c942808 100644 --- a/backend/PythPriceFetcher.ts +++ b/backend/PythPriceFetcher.ts @@ -27,8 +27,8 @@ export class PythPriceFetcher implements IPriceFetcher { this.symbol = symbol } - start (): void { - this.pythConnection.start() + async start () { + await this.pythConnection.start() this.pythConnection.onPriceChange((product: Product, price: PriceData) => { if (product.symbol === this.symbol) { this.onPriceChange(price) @@ -37,14 +37,18 @@ export class PythPriceFetcher implements IPriceFetcher { } stop (): void { - throw new Error('Method not implemented.') + this.pythConnection.stop() } setStrategy (s: IStrategy) { this.strategy = s } - queryTicker (): PriceTicker { + hasData (): boolean { + return this.strategy.bufferCount() > 0 + } + + queryTicker (): PriceTicker | undefined { return this.strategy.getPrice() } diff --git a/backend/main.ts b/backend/main.ts index 1d694d14a..a0c2cf78a 100644 --- a/backend/main.ts +++ b/backend/main.ts @@ -1,7 +1,8 @@ +/* eslint-disable no-unused-vars */ /** * Pricecaster Service. * - * Fetcher backend component. + * Main program file. * * (c) 2021 Randlabs, Inc. */ @@ -9,29 +10,79 @@ import { PythPriceFetcher } from './PythPriceFetcher' import { StdAlgoPublisher } from './publisher/StdAlgoPublisher' import { StrategyLastPrice } from './strategy/strategyLastPrice' +import { IPriceFetcher } from './IPriceFetcher' +import { IPublisher, PublishInfo } from './IPublisher' +import { PriceTicker } from './PriceTicker' +import { StatusCode } from './statusCodes' +import Status from 'algosdk/dist/types/src/client/v2/algod/status' const settings = require('../settings') const algosdk = require('algosdk') +const charm = require('charm')() -console.log('Pricecaster Service Fetcher -- (c) 2021 Randlabs.io\n') - -const fetchers: { [key: string]: PythPriceFetcher } = {} -const publishers: { [key: string]: StdAlgoPublisher } = {} - -for (const sym in settings.symbols) { - console.log(`Setting up fetcher/publisher for ${sym}`) - publishers[sym] = new StdAlgoPublisher(sym, - settings.symbols[sym].priceKeeperAppId, - settings.symbols[sym].validator, - algosdk.mnemonicToSecretKey(settings.symbols[sym].mnemo) - ) - fetchers[sym] = new PythPriceFetcher(sym, new StrategyLastPrice()) +export function sleep (ms: number) { + return new Promise((resolve) => { + setTimeout(resolve, ms) + }) } -// const pricefetcher = new PythPriceFetcher('BTC/USD', new StrategyLastPrice(10)) -// const publisher = new StdAlgoPublisher('BTC/USD', 38888888, ) +type WorkerRoutineStatus = { + status: StatusCode, + reason?: string, + tick?: PriceTicker, + pub?: PublishInfo +} -// async function processTick() { -// const tick = pricefetcher.queryTicker() -// const publishInfo = await publisher.publish(tick) -// setTimeout(processTick, 1000) -// }) +async function workerRoutine (fetcher: IPriceFetcher, publisher: IPublisher): Promise { + const tick = fetcher.queryTicker() + if (tick === undefined) { + return { status: StatusCode.NO_TICKER } + } + const pub = await publisher.publish(tick) + return { status: pub.status, reason: pub.reason, tick, pub } +} + +(async () => { + charm.pipe(process.stdout) + charm.reset() + console.log('Pricecaster Service Fetcher -- (c) 2021 Randlabs.io\n') + const params = settings.params + console.log(`Setting up fetcher/publisher for ${params.symbol} for PriceKeeper App ${params.priceKeeperAppId}, interval ${params.publishIntervalSecs} secs`) + + const publisher = new StdAlgoPublisher(params.symbol, + params.priceKeeperAppId, + params.validator, + (algosdk.mnemonicToSecretKey(params.mnemo)).sk + ) + const fetcher = new PythPriceFetcher(params.symbol, new StrategyLastPrice(params.bufferSize)) + await fetcher.start() + + console.log('Waiting for fetcher to boot...') + while (!fetcher.hasData()) { + await sleep(250) + } + console.log('Waiting for publisher to boot...') + await publisher.start() + console.log('Starting worker.') + + let active = true + charm.on('^C', () => { + console.log('CTRL+C: Aborted by user.') + active = false + }) + // eslint-disable-next-line no-unmodified-loop-condition + let pubCount = 0 + while (active) { + const wrs = await workerRoutine(fetcher, publisher) + switch (wrs.status) { + case StatusCode.OK: + console.log(`[PUB ${pubCount++}] ${wrs.tick!.price}±${wrs.tick!.confidence} t:${wrs.tick!.networkTime} TXID:${wrs.pub!.txid})`) + break + case StatusCode.NO_TICKER: + console.log('No ticker available from fetcher data source') + break + default: + console.log('Error. Reason: ' + wrs.reason) + } + await sleep(params.publishIntervalSecs * 1000) + } +})() diff --git a/backend/publisher/StdAlgoPublisher.ts b/backend/publisher/StdAlgoPublisher.ts index daa18159b..a4e4f60d3 100644 --- a/backend/publisher/StdAlgoPublisher.ts +++ b/backend/publisher/StdAlgoPublisher.ts @@ -1,6 +1,7 @@ import algosdk from 'algosdk' import { IPublisher, PublishInfo } from '../IPublisher' import { PriceTicker } from '../PriceTicker' +import { StatusCode } from '../statusCodes' const PricecasterLib = require('../../lib/pricecaster') const settings = require('../../settings') @@ -18,28 +19,50 @@ export class StdAlgoPublisher implements IPublisher { this.pclib.setAppId(appId) } + async start () { + await this.pclib.compileApprovalProgram() + } + + stop () { + + } + signCallback (sender: string, tx: algosdk.Transaction) { const txSigned = tx.signTxn(this.signKey) return txSigned } async publish (tick: PriceTicker): Promise { - const publishInfo = new PublishInfo() - const msg = this.pclib.createMessage( - this.symbol, - tick.price, - tick.exponent, - tick.confidence, - tick.networkTime, - this.signKey) + const publishInfo: PublishInfo = { status: StatusCode.OK } + let msg, txId + try { + msg = this.pclib.createMessage( + this.symbol, + tick.price, + BigInt(tick.exponent), + tick.confidence, + tick.networkTime, + this.signKey) + publishInfo.msgb64 = msg.toString('base64') + } catch (e: any) { + publishInfo.status = StatusCode.ERROR_CREATE_MESSAGE + publishInfo.reason = e.toString() + return publishInfo + } - const txId = await this.pclib.submitMessage( - this.validator, - msg, - this.signCallback - ) + try { + txId = await this.pclib.submitMessage( + this.validator, + msg, + this.signCallback.bind(this) + ) + publishInfo.txid = txId + } catch (e: any) { + publishInfo.status = StatusCode.ERROR_SUBMIT_MESSAGE + publishInfo.reason = e.response.text ? e.response.text : e.toString() + return publishInfo + } - publishInfo.txid = txId return publishInfo } } diff --git a/backend/sleep.ts b/backend/sleep.ts new file mode 100644 index 000000000..8b27999eb --- /dev/null +++ b/backend/sleep.ts @@ -0,0 +1,5 @@ +export function sleep(ms: number) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} diff --git a/backend/statusCodes.ts b/backend/statusCodes.ts new file mode 100644 index 000000000..ad94f49cb --- /dev/null +++ b/backend/statusCodes.ts @@ -0,0 +1,13 @@ +/* eslint-disable no-unused-vars */ +export enum StatusCode { + OK, + NO_TICKER, + ERROR_CREATE_MESSAGE, + ERROR_SUBMIT_MESSAGE, + GENERAL_ERROR +} + +// export const StatusToString = { +// StatusCode.OK: 'Operation successful' + +// } diff --git a/backend/strategy/strategy.ts b/backend/strategy/strategy.ts index 88c20309f..aa02f985c 100644 --- a/backend/strategy/strategy.ts +++ b/backend/strategy/strategy.ts @@ -24,6 +24,11 @@ export interface IStrategy { */ clearBuffer(): void + /** + * Returns the current number of items in buffer + */ + bufferCount(): number + /** * Put a new price in buffer. * @param priceData The price data to put @@ -34,5 +39,5 @@ export interface IStrategy { /** * Get the calculated price according to selected strategy. */ - getPrice(): PriceTicker + getPrice(): PriceTicker | undefined } diff --git a/backend/strategy/strategyBase.ts b/backend/strategy/strategyBase.ts index 50dcadc1f..09897cf1a 100644 --- a/backend/strategy/strategyBase.ts +++ b/backend/strategy/strategyBase.ts @@ -9,13 +9,37 @@ import { PriceTicker } from '../PriceTicker' import { IStrategy } from './strategy' +/** + * A base class for queue-based buffer strategies + */ export abstract class StrategyBase implements IStrategy { + protected buffer!: PriceTicker[] + protected bufSize!: number + constructor (bufSize: number = 10) { this.createBuffer(bufSize) } - abstract put(priceData: PriceTicker): boolean - abstract createBuffer(size: number): void - abstract clearBuffer(): void - abstract getPrice(): PriceTicker + createBuffer (maxSize: number): void { + this.buffer = [] + this.bufSize = maxSize + } + + clearBuffer (): void { + this.buffer.length = 0 + } + + bufferCount (): number { + return this.buffer.length + } + + put (ticker: PriceTicker): boolean { + if (this.buffer.length === this.bufSize) { + this.buffer.shift() + } + this.buffer.push(ticker) + return true + } + + abstract getPrice(): PriceTicker | undefined } diff --git a/backend/strategy/strategyBaseQueue.ts b/backend/strategy/strategyBaseQueue.ts deleted file mode 100644 index 457cc7a31..000000000 --- a/backend/strategy/strategyBaseQueue.ts +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Pricecaster Service. - * - * Fetcher backend component. - * - * (c) 2021 Randlabs, Inc. - */ - -import { PriceTicker } from '../PriceTicker' -import { StrategyBase } from './strategyBase' - -/** - * A base class for queue-based buffer strategies - */ -export abstract class StrategyBaseQueue extends StrategyBase { - protected buffer: PriceTicker[] = [] - private bufSize: number = 0 - - createBuffer (maxSize: number): void { - this.bufSize = maxSize - } - - clearBuffer (): void { - this.buffer.length = 0 - } - - put (ticker: PriceTicker): boolean { - if (this.buffer.length === this.bufSize) { - this.buffer.shift() - } - this.buffer.push(ticker) - return true - } - - abstract getPrice(): PriceTicker -} diff --git a/backend/strategy/strategyLastPrice.ts b/backend/strategy/strategyLastPrice.ts index b73537681..fcc8b2153 100644 --- a/backend/strategy/strategyLastPrice.ts +++ b/backend/strategy/strategyLastPrice.ts @@ -1,5 +1,5 @@ import { PriceTicker } from '../PriceTicker' -import { StrategyBaseQueue } from './strategyBaseQueue' +import { StrategyBase } from './strategyBase' /** * Pricecaster Service. * @@ -12,12 +12,10 @@ import { StrategyBaseQueue } from './strategyBaseQueue' * This strategy just caches the last provided price, * acting as a single-item buffer. */ -export class StrategyLastPrice extends StrategyBaseQueue { - constructor () { - super(1) - } - - getPrice (): PriceTicker { - return this.buffer[0] +export class StrategyLastPrice extends StrategyBase { + getPrice (): PriceTicker | undefined { + const ret = this.buffer[this.buffer.length - 1] + this.clearBuffer() + return ret } } diff --git a/package-lock.json b/package-lock.json index a1cc58da6..531cdc8ed 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,6 +11,7 @@ "dependencies": { "@pythnetwork/client": "^2.3.1", "algosdk": "^1.11.1", + "charm": "^1.0.2", "fastpriorityqueue": "^0.7.1", "js-sha512": "^0.8.0" }, @@ -999,6 +1000,14 @@ "node": ">=8" } }, + "node_modules/charm": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/charm/-/charm-1.0.2.tgz", + "integrity": "sha1-it02cVOm2aWBMxBSxAkJkdqZXjU=", + "dependencies": { + "inherits": "^2.0.1" + } + }, "node_modules/check-error": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/check-error/-/check-error-1.0.2.tgz", @@ -5171,6 +5180,14 @@ } } }, + "charm": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/charm/-/charm-1.0.2.tgz", + "integrity": "sha1-it02cVOm2aWBMxBSxAkJkdqZXjU=", + "requires": { + "inherits": "^2.0.1" + } + }, "check-error": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/check-error/-/check-error-1.0.2.tgz", diff --git a/package.json b/package.json index 8bfa3385e..a51927b48 100644 --- a/package.json +++ b/package.json @@ -12,6 +12,7 @@ "dependencies": { "@pythnetwork/client": "^2.3.1", "algosdk": "^1.11.1", + "charm": "^1.0.2", "fastpriorityqueue": "^0.7.1", "js-sha512": "^0.8.0" }, diff --git a/settings.js b/settings.js index 9c1b84132..470f9a374 100644 --- a/settings.js +++ b/settings.js @@ -1,22 +1,19 @@ module.exports = { algo: { token: '', - api: 'https://api.betanet.algoexplorer.io', + api: 'https://api.testnet.algoexplorer.io', port: '' }, pyth: { solanaClusterName: 'devnet' }, - symbols: { - 'BTC/USD': { - priceKeeperAppId: 3020301, - validator: 'OPDM7ACAW64Q4VBWAL77Z5SHSJVZZ44V3BAN7W44U43SUXEOUENZMZYOQU', - mnemo: 'assault approve result rare float sugar power float soul kind galaxy edit unusual pretty tone tilt net range pelican avoid unhappy amused recycle abstract master' - }, - 'ETH/USD': { - priceKeeperAppId: 3020301, - validator: 'OPDM7ACAW64Q4VBWAL77Z5SHSJVZZ44V3BAN7W44U43SUXEOUENZMZYOQU', - mnemo: 'assault approve result rare float sugar power float soul kind galaxy edit unusual pretty tone tilt net range pelican avoid unhappy amused recycle abstract master' - } + params: { + verbose: true, + symbol: 'BTC/USD', + bufferSize: 100, + publishIntervalSecs: 30, + priceKeeperAppId: 32968790, + validator: 'OPDM7ACAW64Q4VBWAL77Z5SHSJVZZ44V3BAN7W44U43SUXEOUENZMZYOQU', + mnemo: 'assault approve result rare float sugar power float soul kind galaxy edit unusual pretty tone tilt net range pelican avoid unhappy amused recycle abstract master' } } diff --git a/test/backend-test.js b/test/backend-test.js new file mode 100644 index 000000000..e69de29bb