from __future__ import annotations import struct import anchorpy import time import hashlib from dataclasses import dataclass from decimal import Decimal from typing import Optional, Any, NamedTuple from solana.keypair import Keypair from solana.publickey import PublicKey from spl.token.async_client import AsyncToken from spl.token.constants import TOKEN_PROGRAM_ID from solana.transaction import TransactionSignature from spl.token.instructions import get_associated_token_address from solana.system_program import CreateAccountParams, create_account from switchboardpy.compiled import OracleJob from switchboardpy.common import AccountParams, SwitchboardDecimal from switchboardpy.program import ProgramStateAccount from switchboardpy.oraclequeue import OracleQueueAccount from switchboardpy.oracle import OracleAccount from switchboardpy.job import JobAccount from switchboardpy.lease import LeaseAccount from switchboardpy.permission import PermissionAccount # Parameters for which oracles must submit for responding to update requests. @dataclass class AggregatorSaveResultParams: """Index in the list of oracles in the aggregator assigned to this round update.""" oracle_idx: int """Reports that an error occured and the oracle could not send a value.""" error: bool """Value the oracle is responding with for this update.""" value: Decimal """ The minimum value this oracle has seen this round for the jobs listed in the aggregator. """ min_response: Decimal """ The maximum value this oracle has seen this round for the jobs listed in the aggregator. """ max_response: Decimal """List of OracleJobs that were performed to produce this result""" jobs: list[OracleJob] """Authority of the queue the aggregator is attached to""" queue_authority: PublicKey """Program token mint""" token_mint: PublicKey """List of parsed oracles""" oracles: list[Any] # Parameters for creating and setting a history buffer @dataclass class AggregatorSetHistoryBufferParams: """Number of elements for the history buffer to fit""" size: int """Authority keypair for the aggregator""" authority: Keypair = None # Parameters required to open an aggregator round @dataclass class AggregatorOpenRoundParams: """The oracle queue from which oracles are assigned this update.""" oracle_queue_account: OracleQueueAccount """The token wallet which will receive rewards for calling update on this feed.""" payout_wallet: PublicKey # Init Params for Aggregators @dataclass class AggregatorInitParams: """Number of oracles to request on aggregator update.""" batch_size: int """Minimum number of oracle responses required before a round is validated.""" min_required_oracle_results: int """Minimum number of seconds required between aggregator rounds.""" min_required_job_results: int """Minimum number of seconds required between aggregator rounds.""" min_update_delay_seconds: int """The queue to which this aggregator will be linked""" queue_account: OracleQueueAccount """Name of the aggregator to store on-chain.""" name: bytes = None """Metadata of the aggregator to store on-chain.""" metadata: bytes = None """unix_timestamp for which no feed update will occur before.""" start_after: int = None """ Change percentage required between a previous round and the current round. If variance percentage is not met, reject new oracle responses. """ variance_threshold: Decimal = None """ Number of seconds for which, even if the variance threshold is not passed, accept new responses from oracles. """ force_report_period: int = None """ unix_timestamp after which funds may be withdrawn from the aggregator. null/undefined/0 means the feed has no expiration. """ expiration: int = None """ An optional wallet for receiving kickbacks from job usage in feeds. Defaults to token vault. """ keypair: Keypair = None """ An optional wallet for receiving kickbacks from job usage in feeds. Defaults to token vault. """ author_wallet: PublicKey = None """ If included, this keypair will be the aggregator authority rather than the aggregator keypair. """ authority: PublicKey = None @dataclass class AggregatorHistoryRow: """AggregatorHistoryRow is a wrapper for the row structure of elements in the aggregator history buffer. Attributes: timestamp (int): timestamp of the aggregator result value (Decimal): Aggregator value at the timestamp """ timestamp: int value: Decimal """ Generate an AggregatorHistoryRow from a retrieved buffer representation Args: buf (list): Anchor-loaded buffer representation of AggregatorHistoryRow Returns: AggregatorHistoryRow """ @staticmethod def from_buffer(buf: bytes): timestamp: int = struct.unpack_from(" str: return ''.join(map(chr, *aggregator.name)).decode("utf-8").replace(u"\u0000", "*").encode("utf-8") """ Load and parse AggregatorAccount state based on the program IDL. Returns: name (AggregatorAccount): data parsed in accordance with the Switchboard IDL. Args: Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def load_data(self): aggregator = await self.program.account["AggregatorAccountData"].fetch(self.public_key) aggregator.ebuf = None return aggregator """ Get AggregatorAccount historical data Returns: name (AggregatorAccount): data parsed in accordance with the Switchboard IDL. Args: aggregator (Any): Optional aggregator Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def load_history(self, aggregator: Any = None) -> Any: # if aggregator data passed in - use that, else load this aggregator aggregator = aggregator if aggregator else await self.load_data() # Compare History Buffer to default public key (zeroed out) if (aggregator.history_buffer == 11111111111111111111111111111111): return [] # Fixed AggregatorHistoryRow size ROW_SIZE = 28 # Get account data info = await self.program.provider.connection.get_account_info(aggregator.history_buffer) buffer = info.data if info else [] if not buffer or buffer.length < 12: return [] # Read UInt32 as a Little Endian val, starting at position 8 insert_idx: int = struct.unpack_from(" buffer.length: break row = AggregatorHistoryRow.from_buffer(buffer) if row.timestamp == 0: break if i <= insert_idx: tail.append(row) else: front.append(row) return front.extend(tail) """ Get the latest confirmed value stored in the aggregator account. Args: aggregator (Any): Optional aggregator value to pass in Returns: value (Decimal): the latest feed value Raises: ValueError: If the aggregator currently holds no value AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def get_latest_value(self, aggregator: Optional[Any] = None) -> Decimal: aggregator = aggregator if aggregator else await self.load_data() if hasattr(aggregator, 'latest_confirmed_round') and aggregator.latest_confirmed_round.num_success == 0: raise ValueError('Aggregator currently holds no value.') return SwitchboardDecimal.sbd_to_decimal(aggregator.latest_confirmed_round.result) """ Get the timestamp latest confirmed round stored in the aggregator account. Args: aggregator (Any): Optional aggregator value to pass in Returns: timestamp (str): latest feed timestamp as hex string Raises: ValueError: If the aggregator currently holds no value AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def get_latest_feed_timestamp(self, aggregator: Optional[Any] = None) -> Decimal: aggregator = aggregator if aggregator else await self.load_data() if hasattr(aggregator, 'latest_confirmed_round') and aggregator.latest_confirmed_round.num_success == 0: raise ValueError('Aggregator currently holds no value.') return aggregator.latest_confirmed_round.round_open_timestamp """ Get name of an aggregator. Args: aggregator (any): Anchor-loaded aggregator Returns: name string of the aggregator """ @staticmethod def should_report_value(value: Decimal, aggregator: Optional[Any] = None) -> bool: if aggregator.latestConfirmedRound and aggregator.latest_confirmed_round.num_success == 0: return True timestamp = round(int(time.time()) / 1000) if aggregator.start_after > timestamp: return False variance_threshold = SwitchboardDecimal.sbd_to_decimal(aggregator.variance_threshold) latest_result = SwitchboardDecimal.sbd_to_decimal(aggregator.latest_confirmed_round.result) force_report_period = aggregator.force_report_period last_timestamp = aggregator.latest_confirmed_round.round_open_timestamp if last_timestamp + force_report_period < timestamp: return True if value < latest_result - variance_threshold: return True if value > latest_result + variance_threshold: return True return False """ Get the individual oracle results of the latest confirmed round. Args: aggregator (Any): Optional aggregator value to pass in Returns: timestamp (str): latest feed timestamp as hex string Raises: ValueError: If aggregator currently holds no value. """ async def get_confirmed_round_results(self, aggregator: Optional[Any] = None) -> Decimal: aggregator = aggregator if aggregator else await self.load_data() if hasattr(aggregator, 'latest_confirmed_round') and aggregator.latest_confirmed_round.num_success == 0: raise ValueError('Aggregator currently holds no value.') results: list[Any] = [] for i in range(aggregator.oracle_request_batch_size): if aggregator.latest_confirmed_round.medians_filfilled[i]: results.append({ "oracle_account": OracleAccount(AccountParams(program=self.program, public_key=aggregator.latest_confirmed_round.oracle_pubkeys_data[i])), "value": SwitchboardDecimal.sbd_to_decimal(aggregator.latest_confirmed_round.medians_data[i]) }) return results """ Get the hash of a list of OracleJobs Args: jobs (list[OracleJob]): list of jobs to hash Returns: hash (_Hash): hash as hex string Raises: """ @staticmethod def produce_job_hash(jobs: list[OracleJob]): hash = hashlib.sha256() for job in jobs: job_hasher = hashlib.sha256() job_hasher.update(job.SerializeToString()) hash.update(job_hasher.digest()) return hash """ Load and deserialize all jobs stored in this aggregator Args: aggregator (Any): Optional aggregator Returns: jobs (list[OracleJob]): latest feed timestamp as hex string Raises: ValueError: Failed to load feed jobs. AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def load_jobs(self, aggregator: Optional[Any] = None) -> Decimal: coder = anchorpy.AccountsCoder(self.program.idl) aggregator = aggregator if aggregator else await self.load_data() job_accounts_raw = await anchorpy.utils.rpc.get_multiple_accounts(self.program.provider, aggregator.job_pubkeys_data)[:aggregator.job_pubkeys_size] if not job_accounts_raw: raise ValueError('Failed to load feed jobs.') # Deserialize OracleJob objects from each decoded JobAccountData return [OracleJob.ParseFromString(coder.decode(job)) for job in job_accounts_raw] """ Load all job hashes for each job stored in this aggregator Args: aggregator (Any): Optional aggregator Returns: hashes (list[str]): hashes for each job Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def load_hashes(self, aggregator: Optional[Any] = None) -> Decimal: coder = anchorpy.AccountsCoder(self.program.idl) aggregator = aggregator if aggregator else await self.loadData() job_accounts_raw = await anchorpy.utils.rpc.get_multiple_accounts(self.program.provider, aggregator.job_pubkeys_data)[:aggregator.job_pubkeys_size] if not job_accounts_raw: raise ValueError('Failed to load feed jobs.') # get hashes from each decoded JobAccountData return [coder.decode(job).hash for job in job_accounts_raw] """ Get the size of an AggregatorAccount on chain Returns: int: size of the AggregatorAccount on chain """ def size(self): return self.program.account["AggregatorAccountData"].size """ Create and initialize the AggregatorAccount. Args: program (anchorpy.Program): Switchboard program representation holding connection and IDL params (AggregatorInitParams): init params for the aggregator Returns: AggregatorAccount """ @staticmethod async def create(program: anchorpy.Program, aggregator_init_params: AggregatorInitParams): aggregator_account = aggregator_init_params.keypair or Keypair.generate() authority = aggregator_init_params.authority or aggregator_account.public_key size = program.account["AggregatorAccountData"].size state_account, state_bump = ProgramStateAccount.from_seed(program) state = await state_account.load_data() response = await program.provider.connection.get_minimum_balance_for_rent_exemption(size) lamports = response["result"] zero_decimal = program.type['SwitchboardDecimal'](0, 0) await program.rpc["aggregator_init"]( { "name": aggregator_init_params.name or bytes([0] * 32), "metadata": aggregator_init_params.metadata or bytes([0] * 128), "batch_size": aggregator_init_params.batch_size, "min_oracle_results": aggregator_init_params.min_required_oracle_results, "min_job_results": aggregator_init_params.min_required_job_results, "min_update_delay_seconds": aggregator_init_params.min_update_delay_seconds, "variance_threshold": SwitchboardDecimal.from_decimal(aggregator_init_params.variance_threshold).as_proper_sbd(program) if aggregator_init_params.variance_threshold else zero_decimal, "force_report_period": aggregator_init_params.force_report_period or 0, "expiration": aggregator_init_params.expiration or 0, "state_bump": state_bump, "start_after": aggregator_init_params.start_after, }, ctx=anchorpy.Context( accounts={ "aggregator": aggregator_account.public_key, "authority": authority, "queue": aggregator_init_params.queue_account.public_key, "author_wallet": aggregator_init_params.author_wallet or state.token_vault, "program_state": state_account.public_key }, signers=[aggregator_account], pre_instructions=[ create_account( CreateAccountParams( from_pubkey=program.provider.wallet.public_key, new_account_pubkey=aggregator_account.public_key, lamports=lamports, space=size, program_id=program.program_id ) ) ] ) ) return AggregatorAccount(AccountParams(program=program, keypair=aggregator_account)) """ Create and set a history buffer for the aggregator Args: program (anchorpy.Program): Switchboard program representation holding connection and IDL params (AggregatorSetHistoryBufferParams) Returns: TransactionSignature """ async def set_history_buffer(self, params: AggregatorSetHistoryBufferParams): buffer = Keypair.generate() program = self.program authority = params.authority or self.keypair HISTORY_ROW_SIZE = 28 INSERT_IDX_SIZE = 4 DISCRIMINATOR_SIZE = 8 size = params.size * HISTORY_ROW_SIZE + INSERT_IDX_SIZE + DISCRIMINATOR_SIZE response = await program.provider.connection.get_minimum_balance_for_rent_exemption(size) lamports = response["result"] await program.rpc["aggregator_set_history_buffer"]( ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, "buffer": buffer.public_key }, signers=[authority, buffer], pre_instructions=[ create_account( CreateAccountParams( from_pubkey=program.provider.wallet.public_key, new_account_pubkey=buffer.public_key, space=size, lamports=lamports, program_id=program.program_id ) ) ] ) ) """ RPC to add a new job to an aggregtor to be performed on feed updates. Args: job (JobAccount): specifying another job for this aggregator to fulfill on update authority (Keypair | None) Returns: TransactionSignature """ async def add_job(self, job: JobAccount, authority: Optional[Keypair] = None) -> TransactionSignature: authority = authority or self.keypair return await self.program.rpc['aggregator_add_job']( { "params": None }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, "job": job.public_key }, signers=[authority] ) ) """ Prevent new jobs from being added to the feed. Args: authority (Keypair | None): the current authority keypair Returns: TransactionSignature """ async def lock(self, authority: Optional[Keypair] = None) -> TransactionSignature: authority = authority or self.keypair return await self.program.rpc['aggregator_lock']( ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, }, signers=[authority] ) ) """ Change the aggregator authority Args: new_authority (Keypair): The new authority current_authority (Keypair | None): the current authority keypair Returns: TransactionSignature """ async def set_authority(self, new_authority: Keypair, current_authority: Optional[Keypair] = None) -> TransactionSignature: current_authority = current_authority or self.keypair return await self.program.rpc['aggregator_set_authoirty']( ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "new_authority": new_authority, "authority": current_authority.public_key, }, signers=[current_authority] ) ) """ RPC to add remove job from an aggregtor. Args: job (JobAccount): specifying job to remove authority (Keypair | None) Returns: TransactionSignature """ async def remove_job(self, job: JobAccount, authority: Optional[Keypair] = None) -> TransactionSignature: authority = authority or self.keypair return await self.program.rpc['aggregator_remove_job']( ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, "job": job.public_key }, signers=[authority] ) ) """ Get Index of Oracle in Aggregator Args: oracle_pubkey (PublicKey): Public key belonging to the oracle Returns: int: index of the oracle, -1 if not found """ async def get_oracle_index(self, oracle_pubkey: PublicKey): aggregator = await self.load_data() for i, curr_oracle_pubkey in enumerate(aggregator.current_round.oracle_pubkeys_data): if curr_oracle_pubkey == oracle_pubkey: return i return -1 """ Save Aggregator result Args: aggregator (Any): Aggregator data oracle_account (OracleAccount) params (AggregatorSaveResultParams) Returns: TransactionSignature """ async def remove_job(self, aggregator: Any, oracle_account: OracleAccount, params: AggregatorSaveResultParams) -> TransactionSignature: return await self.program.provider.send( tx=( await self.save_result_txn( aggregator, oracle_account, params ) ) ) """ RPC call for an oracle to save a result to an aggregator round. Args: aggregator (Any): Aggregator data oracle_account (OracleAccount) params (AggregatorSaveResultParams) Returns: TransactionSignature """ async def save_result_txn(self, aggregator: Any, oracle_account: OracleAccount, params: AggregatorSaveResultParams): payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key) remaining_accounts: list[PublicKey] = [] for i in range(aggregator.oracle_request_batch_size): remaining_accounts.append(aggregator.current_round.oracle_pubkeys_data[i]) for oracle in params.oracles: remaining_accounts.push(oracle.token_account) queue_pubkey = aggregator.queue_pubkey queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=queue_pubkey)) lease_account, lease_bump = LeaseAccount.from_seed( self.program, queue_account, self ) escrow = get_associated_token_address(lease_account.public_key, params.token_mint) feed_permission_account, feed_permission_bump = PermissionAccount.from_seed( self.program, params.queue_authority, queue_account.public_key, self.public_key ) oracle_permission_account, oracle_permission_bump = PermissionAccount.from_seed( self.program, params.queue_authority, queue_account.public_key, oracle_account.public_key ) program_state_account, state_bump = ProgramStateAccount.from_seed(self.program) digest = await self.produce_job_hash(params.jobs).digest() history_buffer = aggregator.history_buffer if history_buffer == PublicKey('11111111111111111111111111111111'): history_buffer = self.public_key return self.program.transaction['aggregator_save_result']( { "oracle_idx": params.oracle_idx, "error": params.error, "value": SwitchboardDecimal.from_decimal(params.value).as_proper_sbd(self.program), "jobs_checksum": digest, "min_response": SwitchboardDecimal.from_decimal(params.min_response).as_proper_sbd(self.program), "max_response": SwitchboardDecimal.from_decimal(params.max_response).as_proper_sbd(self.program), "feed_permission_bump": feed_permission_bump, "oracle_permission_bump": oracle_permission_bump, "lease_bump": lease_bump, "state_bump": state_bump }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "oracle": oracle_account.public_key, "oracle_authority": payer_keypair.public_key, "oracle_queue": queue_account.public_key, "feed_permission": feed_permission_account.public_key, "oracle_permission": oracle_permission_account.public_key, "lease": lease_account.public_key, "escrow": escrow, "token_program": TOKEN_PROGRAM_ID, "program_state": program_state_account.public_key, "history_buffer": history_buffer }, remaining_accounts=[{"is_signer": False, "is_writable": True, "pubkey": pubkey} for pubkey in remaining_accounts] ) )