90 lines
2.8 KiB
Python
90 lines
2.8 KiB
Python
import asyncio
|
|
from pytest import fixture, mark
|
|
|
|
from switchboardpy import (
|
|
SBV2_DEVNET_PID,
|
|
AggregatorOpenRoundParams,
|
|
AggregatorInitParams,
|
|
AggregatorHistoryRow,
|
|
AggregatorAccount,
|
|
AccountParams,
|
|
AggregatorSaveResultParams,
|
|
AggregatorSetHistoryBufferParams,
|
|
OracleQueueAccount
|
|
)
|
|
|
|
from contextlib import contextmanager
|
|
from decimal import Decimal
|
|
from solana.keypair import Keypair
|
|
from solana.publickey import PublicKey
|
|
from solana.rpc.async_api import AsyncClient
|
|
from anchorpy import Program, Provider, Wallet
|
|
|
|
class SwitchboardProgram(object):
|
|
|
|
async def __aenter__(self):
|
|
client = AsyncClient("https://api.devnet.solana.com/")
|
|
provider = Provider(client, Wallet.local()) # 2RBU9Eie9GpBe8kY81Vo3zHwnXMBbcvh8bnb6f9CLzts
|
|
self.program = await Program.at(
|
|
SBV2_DEVNET_PID, provider
|
|
)
|
|
return self.program
|
|
|
|
async def __aexit__(self, exc_t, exc_v, exc_tb):
|
|
await self.program.close()
|
|
|
|
@mark.asyncio
|
|
async def test_load_data():
|
|
async with SwitchboardProgram() as program:
|
|
|
|
agg = AggregatorAccount(AccountParams(program=program, public_key=PublicKey("HMtDNnoCPD6NQRCE2uScEWSvwaZY3hWixCK12TKNtGpc")))
|
|
|
|
# getting aggregator data
|
|
data = await agg.load_data()
|
|
|
|
assert data.min_oracle_results == 2
|
|
assert data.oracle_request_batch_size == 3
|
|
assert data.min_job_results == 1
|
|
print(data)
|
|
|
|
@mark.asyncio
|
|
async def test_get_latest_value():
|
|
async with SwitchboardProgram() as program:
|
|
agg = AggregatorAccount(AccountParams(program=program, public_key=PublicKey("HMtDNnoCPD6NQRCE2uScEWSvwaZY3hWixCK12TKNtGpc")))
|
|
|
|
# getting most recent value
|
|
val = await agg.get_latest_value()
|
|
assert Decimal('39.792') == val
|
|
|
|
print('LATEST VALUE:')
|
|
print(val)
|
|
|
|
@mark.asyncio
|
|
async def test_get_latest_timestamp():
|
|
async with SwitchboardProgram() as program:
|
|
agg = AggregatorAccount(AccountParams(program=program, public_key=PublicKey("HMtDNnoCPD6NQRCE2uScEWSvwaZY3hWixCK12TKNtGpc")))
|
|
|
|
# getting most recent value
|
|
val = await agg.get_latest_feed_timestamp()
|
|
assert Decimal('1654626799') == val
|
|
print('LATEST TIMESTAMP:')
|
|
print(val)
|
|
|
|
@mark.asyncio
|
|
async def test_create():
|
|
async with SwitchboardProgram() as program:
|
|
await AggregatorAccount.create(
|
|
program=program,
|
|
aggregator_init_params=AggregatorInitParams(
|
|
batch_size=3,
|
|
min_required_oracle_results=2,
|
|
min_required_job_results=1,
|
|
min_update_delay_seconds=300,
|
|
queue_account=OracleQueueAccount(
|
|
AccountParams(
|
|
program=program,
|
|
public_key=PublicKey("F8ce7MsckeZAbAGmxjJNetxYXQa9mKr9nnrC3qKubyYy")
|
|
)
|
|
),
|
|
)
|
|
) |