Service booting and feeding :happy:

This commit is contained in:
Hernán Di Pietro 2021-10-08 16:03:37 -03:00
parent 4f439cf788
commit 5de95fa8bb
15 changed files with 216 additions and 105 deletions

View File

@ -7,19 +7,21 @@
*/ */
import { PriceTicker } from './PriceTicker' import { PriceTicker } from './PriceTicker'
import { IStrategy } from './strategy/strategy'
export interface IPriceFetcher { export interface IPriceFetcher {
start(): void start(): void
stop(): void stop(): void
hasData(): boolean
/** /**
* Set price aggregation strategy for this fetcher. * Set price aggregation strategy for this fetcher.
* @param IStrategy The local price aggregation strategy * @param IStrategy The local price aggregation strategy
*/ */
setStrategy(IStrategy) setStrategy(s: IStrategy)
/** /**
* Get the current price, according to running strategy. * Get the current price, according to running strategy.
*/ */
queryTicker(): PriceTicker queryTicker(): PriceTicker | undefined
} }

View File

@ -1,3 +1,4 @@
/* eslint-disable no-unused-vars */
/** /**
* Pricecaster Service. * Pricecaster Service.
* *
@ -7,12 +8,18 @@
*/ */
import { PriceTicker } from './PriceTicker' import { PriceTicker } from './PriceTicker'
import { StatusCode } from './statusCodes'
export class PublishInfo { export type PublishInfo = {
block: BigInt = BigInt(0) status: StatusCode,
txid: string = '' reason?: '',
msgb64?: '',
block?: BigInt
txid?: string
} }
export interface IPublisher { export interface IPublisher {
start(): void
stop(): void
publish(tick: PriceTicker): Promise<PublishInfo> publish(tick: PriceTicker): Promise<PublishInfo>
} }

View File

@ -27,8 +27,8 @@ export class PythPriceFetcher implements IPriceFetcher {
this.symbol = symbol this.symbol = symbol
} }
start (): void { async start () {
this.pythConnection.start() await this.pythConnection.start()
this.pythConnection.onPriceChange((product: Product, price: PriceData) => { this.pythConnection.onPriceChange((product: Product, price: PriceData) => {
if (product.symbol === this.symbol) { if (product.symbol === this.symbol) {
this.onPriceChange(price) this.onPriceChange(price)
@ -37,14 +37,18 @@ export class PythPriceFetcher implements IPriceFetcher {
} }
stop (): void { stop (): void {
throw new Error('Method not implemented.') this.pythConnection.stop()
} }
setStrategy (s: IStrategy) { setStrategy (s: IStrategy) {
this.strategy = s this.strategy = s
} }
queryTicker (): PriceTicker { hasData (): boolean {
return this.strategy.bufferCount() > 0
}
queryTicker (): PriceTicker | undefined {
return this.strategy.getPrice() return this.strategy.getPrice()
} }

View File

@ -1,7 +1,8 @@
/* eslint-disable no-unused-vars */
/** /**
* Pricecaster Service. * Pricecaster Service.
* *
* Fetcher backend component. * Main program file.
* *
* (c) 2021 Randlabs, Inc. * (c) 2021 Randlabs, Inc.
*/ */
@ -9,29 +10,79 @@
import { PythPriceFetcher } from './PythPriceFetcher' import { PythPriceFetcher } from './PythPriceFetcher'
import { StdAlgoPublisher } from './publisher/StdAlgoPublisher' import { StdAlgoPublisher } from './publisher/StdAlgoPublisher'
import { StrategyLastPrice } from './strategy/strategyLastPrice' import { StrategyLastPrice } from './strategy/strategyLastPrice'
import { IPriceFetcher } from './IPriceFetcher'
import { IPublisher, PublishInfo } from './IPublisher'
import { PriceTicker } from './PriceTicker'
import { StatusCode } from './statusCodes'
import Status from 'algosdk/dist/types/src/client/v2/algod/status'
const settings = require('../settings') const settings = require('../settings')
const algosdk = require('algosdk') const algosdk = require('algosdk')
const charm = require('charm')()
console.log('Pricecaster Service Fetcher -- (c) 2021 Randlabs.io\n') export function sleep (ms: number) {
return new Promise((resolve) => {
const fetchers: { [key: string]: PythPriceFetcher } = {} setTimeout(resolve, ms)
const publishers: { [key: string]: StdAlgoPublisher } = {} })
for (const sym in settings.symbols) {
console.log(`Setting up fetcher/publisher for ${sym}`)
publishers[sym] = new StdAlgoPublisher(sym,
settings.symbols[sym].priceKeeperAppId,
settings.symbols[sym].validator,
algosdk.mnemonicToSecretKey(settings.symbols[sym].mnemo)
)
fetchers[sym] = new PythPriceFetcher(sym, new StrategyLastPrice())
} }
// const pricefetcher = new PythPriceFetcher('BTC/USD', new StrategyLastPrice(10)) type WorkerRoutineStatus = {
// const publisher = new StdAlgoPublisher('BTC/USD', 38888888, ) status: StatusCode,
reason?: string,
tick?: PriceTicker,
pub?: PublishInfo
}
// async function processTick() { async function workerRoutine (fetcher: IPriceFetcher, publisher: IPublisher): Promise<WorkerRoutineStatus> {
// const tick = pricefetcher.queryTicker() const tick = fetcher.queryTicker()
// const publishInfo = await publisher.publish(tick) if (tick === undefined) {
// setTimeout(processTick, 1000) return { status: StatusCode.NO_TICKER }
// }) }
const pub = await publisher.publish(tick)
return { status: pub.status, reason: pub.reason, tick, pub }
}
(async () => {
charm.pipe(process.stdout)
charm.reset()
console.log('Pricecaster Service Fetcher -- (c) 2021 Randlabs.io\n')
const params = settings.params
console.log(`Setting up fetcher/publisher for ${params.symbol} for PriceKeeper App ${params.priceKeeperAppId}, interval ${params.publishIntervalSecs} secs`)
const publisher = new StdAlgoPublisher(params.symbol,
params.priceKeeperAppId,
params.validator,
(algosdk.mnemonicToSecretKey(params.mnemo)).sk
)
const fetcher = new PythPriceFetcher(params.symbol, new StrategyLastPrice(params.bufferSize))
await fetcher.start()
console.log('Waiting for fetcher to boot...')
while (!fetcher.hasData()) {
await sleep(250)
}
console.log('Waiting for publisher to boot...')
await publisher.start()
console.log('Starting worker.')
let active = true
charm.on('^C', () => {
console.log('CTRL+C: Aborted by user.')
active = false
})
// eslint-disable-next-line no-unmodified-loop-condition
let pubCount = 0
while (active) {
const wrs = await workerRoutine(fetcher, publisher)
switch (wrs.status) {
case StatusCode.OK:
console.log(`[PUB ${pubCount++}] ${wrs.tick!.price}±${wrs.tick!.confidence} t:${wrs.tick!.networkTime} TXID:${wrs.pub!.txid})`)
break
case StatusCode.NO_TICKER:
console.log('No ticker available from fetcher data source')
break
default:
console.log('Error. Reason: ' + wrs.reason)
}
await sleep(params.publishIntervalSecs * 1000)
}
})()

View File

@ -1,6 +1,7 @@
import algosdk from 'algosdk' import algosdk from 'algosdk'
import { IPublisher, PublishInfo } from '../IPublisher' import { IPublisher, PublishInfo } from '../IPublisher'
import { PriceTicker } from '../PriceTicker' import { PriceTicker } from '../PriceTicker'
import { StatusCode } from '../statusCodes'
const PricecasterLib = require('../../lib/pricecaster') const PricecasterLib = require('../../lib/pricecaster')
const settings = require('../../settings') const settings = require('../../settings')
@ -18,28 +19,50 @@ export class StdAlgoPublisher implements IPublisher {
this.pclib.setAppId(appId) this.pclib.setAppId(appId)
} }
async start () {
await this.pclib.compileApprovalProgram()
}
stop () {
}
signCallback (sender: string, tx: algosdk.Transaction) { signCallback (sender: string, tx: algosdk.Transaction) {
const txSigned = tx.signTxn(this.signKey) const txSigned = tx.signTxn(this.signKey)
return txSigned return txSigned
} }
async publish (tick: PriceTicker): Promise<PublishInfo> { async publish (tick: PriceTicker): Promise<PublishInfo> {
const publishInfo = new PublishInfo() const publishInfo: PublishInfo = { status: StatusCode.OK }
const msg = this.pclib.createMessage( let msg, txId
this.symbol, try {
tick.price, msg = this.pclib.createMessage(
tick.exponent, this.symbol,
tick.confidence, tick.price,
tick.networkTime, BigInt(tick.exponent),
this.signKey) tick.confidence,
tick.networkTime,
this.signKey)
publishInfo.msgb64 = msg.toString('base64')
} catch (e: any) {
publishInfo.status = StatusCode.ERROR_CREATE_MESSAGE
publishInfo.reason = e.toString()
return publishInfo
}
const txId = await this.pclib.submitMessage( try {
this.validator, txId = await this.pclib.submitMessage(
msg, this.validator,
this.signCallback msg,
) this.signCallback.bind(this)
)
publishInfo.txid = txId
} catch (e: any) {
publishInfo.status = StatusCode.ERROR_SUBMIT_MESSAGE
publishInfo.reason = e.response.text ? e.response.text : e.toString()
return publishInfo
}
publishInfo.txid = txId
return publishInfo return publishInfo
} }
} }

5
backend/sleep.ts Normal file
View File

@ -0,0 +1,5 @@
export function sleep(ms: number) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}

13
backend/statusCodes.ts Normal file
View File

@ -0,0 +1,13 @@
/* eslint-disable no-unused-vars */
export enum StatusCode {
OK,
NO_TICKER,
ERROR_CREATE_MESSAGE,
ERROR_SUBMIT_MESSAGE,
GENERAL_ERROR
}
// export const StatusToString = {
// StatusCode.OK: 'Operation successful'
// }

View File

@ -24,6 +24,11 @@ export interface IStrategy {
*/ */
clearBuffer(): void clearBuffer(): void
/**
* Returns the current number of items in buffer
*/
bufferCount(): number
/** /**
* Put a new price in buffer. * Put a new price in buffer.
* @param priceData The price data to put * @param priceData The price data to put
@ -34,5 +39,5 @@ export interface IStrategy {
/** /**
* Get the calculated price according to selected strategy. * Get the calculated price according to selected strategy.
*/ */
getPrice(): PriceTicker getPrice(): PriceTicker | undefined
} }

View File

@ -9,13 +9,37 @@
import { PriceTicker } from '../PriceTicker' import { PriceTicker } from '../PriceTicker'
import { IStrategy } from './strategy' import { IStrategy } from './strategy'
/**
* A base class for queue-based buffer strategies
*/
export abstract class StrategyBase implements IStrategy { export abstract class StrategyBase implements IStrategy {
protected buffer!: PriceTicker[]
protected bufSize!: number
constructor (bufSize: number = 10) { constructor (bufSize: number = 10) {
this.createBuffer(bufSize) this.createBuffer(bufSize)
} }
abstract put(priceData: PriceTicker): boolean createBuffer (maxSize: number): void {
abstract createBuffer(size: number): void this.buffer = []
abstract clearBuffer(): void this.bufSize = maxSize
abstract getPrice(): PriceTicker }
clearBuffer (): void {
this.buffer.length = 0
}
bufferCount (): number {
return this.buffer.length
}
put (ticker: PriceTicker): boolean {
if (this.buffer.length === this.bufSize) {
this.buffer.shift()
}
this.buffer.push(ticker)
return true
}
abstract getPrice(): PriceTicker | undefined
} }

View File

@ -1,36 +0,0 @@
/**
* Pricecaster Service.
*
* Fetcher backend component.
*
* (c) 2021 Randlabs, Inc.
*/
import { PriceTicker } from '../PriceTicker'
import { StrategyBase } from './strategyBase'
/**
* A base class for queue-based buffer strategies
*/
export abstract class StrategyBaseQueue extends StrategyBase {
protected buffer: PriceTicker[] = []
private bufSize: number = 0
createBuffer (maxSize: number): void {
this.bufSize = maxSize
}
clearBuffer (): void {
this.buffer.length = 0
}
put (ticker: PriceTicker): boolean {
if (this.buffer.length === this.bufSize) {
this.buffer.shift()
}
this.buffer.push(ticker)
return true
}
abstract getPrice(): PriceTicker
}

View File

@ -1,5 +1,5 @@
import { PriceTicker } from '../PriceTicker' import { PriceTicker } from '../PriceTicker'
import { StrategyBaseQueue } from './strategyBaseQueue' import { StrategyBase } from './strategyBase'
/** /**
* Pricecaster Service. * Pricecaster Service.
* *
@ -12,12 +12,10 @@ import { StrategyBaseQueue } from './strategyBaseQueue'
* This strategy just caches the last provided price, * This strategy just caches the last provided price,
* acting as a single-item buffer. * acting as a single-item buffer.
*/ */
export class StrategyLastPrice extends StrategyBaseQueue { export class StrategyLastPrice extends StrategyBase {
constructor () { getPrice (): PriceTicker | undefined {
super(1) const ret = this.buffer[this.buffer.length - 1]
} this.clearBuffer()
return ret
getPrice (): PriceTicker {
return this.buffer[0]
} }
} }

17
package-lock.json generated
View File

@ -11,6 +11,7 @@
"dependencies": { "dependencies": {
"@pythnetwork/client": "^2.3.1", "@pythnetwork/client": "^2.3.1",
"algosdk": "^1.11.1", "algosdk": "^1.11.1",
"charm": "^1.0.2",
"fastpriorityqueue": "^0.7.1", "fastpriorityqueue": "^0.7.1",
"js-sha512": "^0.8.0" "js-sha512": "^0.8.0"
}, },
@ -999,6 +1000,14 @@
"node": ">=8" "node": ">=8"
} }
}, },
"node_modules/charm": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/charm/-/charm-1.0.2.tgz",
"integrity": "sha1-it02cVOm2aWBMxBSxAkJkdqZXjU=",
"dependencies": {
"inherits": "^2.0.1"
}
},
"node_modules/check-error": { "node_modules/check-error": {
"version": "1.0.2", "version": "1.0.2",
"resolved": "https://registry.npmjs.org/check-error/-/check-error-1.0.2.tgz", "resolved": "https://registry.npmjs.org/check-error/-/check-error-1.0.2.tgz",
@ -5171,6 +5180,14 @@
} }
} }
}, },
"charm": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/charm/-/charm-1.0.2.tgz",
"integrity": "sha1-it02cVOm2aWBMxBSxAkJkdqZXjU=",
"requires": {
"inherits": "^2.0.1"
}
},
"check-error": { "check-error": {
"version": "1.0.2", "version": "1.0.2",
"resolved": "https://registry.npmjs.org/check-error/-/check-error-1.0.2.tgz", "resolved": "https://registry.npmjs.org/check-error/-/check-error-1.0.2.tgz",

View File

@ -12,6 +12,7 @@
"dependencies": { "dependencies": {
"@pythnetwork/client": "^2.3.1", "@pythnetwork/client": "^2.3.1",
"algosdk": "^1.11.1", "algosdk": "^1.11.1",
"charm": "^1.0.2",
"fastpriorityqueue": "^0.7.1", "fastpriorityqueue": "^0.7.1",
"js-sha512": "^0.8.0" "js-sha512": "^0.8.0"
}, },

View File

@ -1,22 +1,19 @@
module.exports = { module.exports = {
algo: { algo: {
token: '', token: '',
api: 'https://api.betanet.algoexplorer.io', api: 'https://api.testnet.algoexplorer.io',
port: '' port: ''
}, },
pyth: { pyth: {
solanaClusterName: 'devnet' solanaClusterName: 'devnet'
}, },
symbols: { params: {
'BTC/USD': { verbose: true,
priceKeeperAppId: 3020301, symbol: 'BTC/USD',
validator: 'OPDM7ACAW64Q4VBWAL77Z5SHSJVZZ44V3BAN7W44U43SUXEOUENZMZYOQU', bufferSize: 100,
mnemo: 'assault approve result rare float sugar power float soul kind galaxy edit unusual pretty tone tilt net range pelican avoid unhappy amused recycle abstract master' publishIntervalSecs: 30,
}, priceKeeperAppId: 32968790,
'ETH/USD': { validator: 'OPDM7ACAW64Q4VBWAL77Z5SHSJVZZ44V3BAN7W44U43SUXEOUENZMZYOQU',
priceKeeperAppId: 3020301, mnemo: 'assault approve result rare float sugar power float soul kind galaxy edit unusual pretty tone tilt net range pelican avoid unhappy amused recycle abstract master'
validator: 'OPDM7ACAW64Q4VBWAL77Z5SHSJVZZ44V3BAN7W44U43SUXEOUENZMZYOQU',
mnemo: 'assault approve result rare float sugar power float soul kind galaxy edit unusual pretty tone tilt net range pelican avoid unhappy amused recycle abstract master'
}
} }
} }

0
test/backend-test.js Normal file
View File