switchboardpy
The Switchboard Python v2 Wrapper.
View Source
"""The Switchboard Python v2 Wrapper.""" from switchboardpy.aggregator import ( AggregatorAccount, AggregatorHistoryRow, AggregatorInitParams, AggregatorOpenRoundParams, AggregatorSaveResultParams, AggregatorSetHistoryBufferParams, ) from switchboardpy.compiled import OracleJob from switchboardpy.common import SBV2_DEVNET_PID, AccountParams, SwitchboardDecimal from switchboardpy.crank import CrankAccount, CrankPopParams, CrankInitParams, CrankPushParams, CrankRow from switchboardpy.job import JobAccount, JobInitParams from switchboardpy.lease import LeaseAccount, LeaseExtendParams, LeaseInitParams, LeaseWithdrawParams from switchboardpy.oracle import OracleAccount, OracleInitParams, OracleWithdrawParams from switchboardpy.oraclequeue import OracleQueueAccount, OracleQueueInitParams from switchboardpy.permission import PermissionAccount, PermissionInitParams, PermissionSetParams from switchboardpy.program import ProgramStateAccount, ProgramInitParams, VaultTransferParams __all__ = [ "AccountParams", "AggregatorAccount", "AggregatorHistoryRow", "AggregatorInitParams", "AggregatorOpenRoundParams", "AggregatorSaveResultParams", "AggregatorSetHistoryBufferParams", "CrankAccount", "CrankPopParams", "CrankInitParams", "CrankPushParams", "CrankRow", "JobAccount", "JobInitParams", "LeaseAccount", "LeaseExtendParams", "LeaseInitParams", "LeaseWithdrawParams", "OracleAccount", "OracleInitParams", "OracleWithdrawParams", "OracleQueueAccount", "OracleQueueInitParams", "OracleJob", "PermissionAccount", "PermissionInitParams", "PermissionSetParams", "ProgramStateAccount", "ProgramInitParams", "VaultTransferParams", "SwitchboardDecimal", "readRawVarint32", "readDelimitedFrom" ]
View Source
@dataclass class AccountParams: """program referencing the Switchboard program and IDL.""" program: anchorpy.Program """ Public key of the account being referenced. This will always be populated within the account wrapper. """ public_key: PublicKey = None """Keypair of the account being referenced. This may not always be populated.""" keypair: Keypair = None
program referencing the Switchboard program and IDL.
Public key of the account being referenced. This will always be populated within the account wrapper.
Keypair of the account being referenced. This may not always be populated.
View Source
class AggregatorAccount: """AggregatorAccount is the wrapper for an Aggregator, the structure for that keeps aggregated feed data / metadata. Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This aggregator's public key keypair (Keypair | None): this aggregator's keypair """ def __init__(self, params: AccountParams): if params.public_key is None and params.keypair is None: raise ValueError('User must provide either a publicKey or keypair for account use.') if params.keypair and params.public_key and params.keypair.public_key != params.public_key: raise ValueError('User must provide either a publicKey or keypair for account use.') self.program = params.program self.public_key = params.keypair.public_key if params.keypair else params.public_key self.keypair = params.keypair """ Get name of an aggregator. Args: aggregator (Any): Anchor-loaded aggregator Returns: name string of the aggregator """ @staticmethod def get_name(aggregator: Any) -> str: return ''.join(map(chr, *aggregator.name)).decode("utf-8").replace(u"\u0000", "*").encode("utf-8") """ Load and parse AggregatorAccount state based on the program IDL. Returns: name (AggregatorAccount): data parsed in accordance with the Switchboard IDL. Args: Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def load_data(self): return await AggregatorAccountData.fetch(self.program.provider.connection, self.public_key) """ Get AggregatorAccount historical data Returns: name (AggregatorAccount): data parsed in accordance with the Switchboard IDL. Args: aggregator (Any): Optional aggregator Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def load_history(self, aggregator: Any = None) -> Any: # if aggregator data passed in - use that, else load this aggregator aggregator = aggregator if aggregator else await self.load_data() # Compare History Buffer to default public key (zeroed out) if (aggregator.history_buffer == 11111111111111111111111111111111): return [] # Fixed AggregatorHistoryRow size ROW_SIZE = 28 # Get account data info = await self.program.provider.connection.get_account_info(aggregator.history_buffer) buffer = info.data if info else [] if not buffer or buffer.length < 12: return [] # Read UInt32 as a Little Endian val, starting at position 8 insert_idx: int = struct.unpack_from("<L", buffer, 8)[0] * ROW_SIZE front = [] tail = [] if not isinstance(buffer, list): return [] for i in range(13, buffer.length, ROW_SIZE): if i + ROW_SIZE > buffer.length: break row = AggregatorHistoryRow.from_buffer(buffer) if row.timestamp == 0: break if i <= insert_idx: tail.append(row) else: front.append(row) return front.extend(tail) """ Get the latest confirmed value stored in the aggregator account. Args: aggregator (Any): Optional aggregator value to pass in Returns: value (Decimal): the latest feed value Raises: ValueError: If the aggregator currently holds no value AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def get_latest_value(self, aggregator: Optional[Any] = None) -> Decimal: aggregator = aggregator if aggregator else await self.load_data() if hasattr(aggregator, 'latest_confirmed_round') and aggregator.latest_confirmed_round.num_success == 0: raise ValueError('Aggregator currently holds no value.') return SwitchboardDecimal.sbd_to_decimal(aggregator.latest_confirmed_round.result) """ Get the timestamp latest confirmed round stored in the aggregator account. Args: aggregator (Any): Optional aggregator value to pass in Returns: timestamp (str): latest feed timestamp as hex string Raises: ValueError: If the aggregator currently holds no value AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def get_latest_feed_timestamp(self, aggregator: Optional[Any] = None) -> Decimal: aggregator = aggregator if aggregator else await self.load_data() if hasattr(aggregator, 'latest_confirmed_round') and aggregator.latest_confirmed_round.num_success == 0: raise ValueError('Aggregator currently holds no value.') return aggregator.latest_confirmed_round.round_open_timestamp """ Get name of an aggregator. Args: aggregator (any): Anchor-loaded aggregator Returns: name string of the aggregator """ @staticmethod def should_report_value(value: Decimal, aggregator: Optional[Any] = None) -> bool: if aggregator.latestConfirmedRound and aggregator.latest_confirmed_round.num_success == 0: return True timestamp = round(int(time.time()) / 1000) if aggregator.start_after > timestamp: return False variance_threshold = SwitchboardDecimal.sbd_to_decimal(aggregator.variance_threshold) latest_result = SwitchboardDecimal.sbd_to_decimal(aggregator.latest_confirmed_round.result) force_report_period = aggregator.force_report_period last_timestamp = aggregator.latest_confirmed_round.round_open_timestamp if last_timestamp + force_report_period < timestamp: return True diff = latest_result / value if abs(diff) > 1: diff = value / latest_result if diff < 0: return True change_percentage = 1 - diff * 100 return change_percentage > variance_threshold """ Get the individual oracle results of the latest confirmed round. Args: aggregator (Any): Optional aggregator value to pass in Returns: timestamp (str): latest feed timestamp as hex string Raises: ValueError: If aggregator currently holds no value. """ async def get_confirmed_round_results(self, aggregator: Optional[Any] = None) -> Decimal: aggregator = aggregator if aggregator else await self.load_data() if hasattr(aggregator, 'latest_confirmed_round') and aggregator.latest_confirmed_round.num_success == 0: raise ValueError('Aggregator currently holds no value.') results: list[Any] = [] for i in range(aggregator.oracle_request_batch_size): if aggregator.latest_confirmed_round.medians_filfilled[i]: results.append({ "oracle_account": OracleAccount(AccountParams(program=self.program, public_key=aggregator.latest_confirmed_round.oracle_pubkeys_data[i])), "value": SwitchboardDecimal.sbd_to_decimal(aggregator.latest_confirmed_round.medians_data[i]) }) return results """ Get the hash of a list of OracleJobs Args: jobs (list[OracleJob]): list of jobs to hash Returns: hash (_Hash): hash as hex string Raises: """ @staticmethod def produce_job_hash(jobs: list[OracleJob]): hash = hashlib.sha256() for job in jobs: job_hasher = hashlib.sha256() job_hasher.update(job.SerializeToString()) hash.update(job_hasher.digest()) return hash """ Load and deserialize all jobs stored in this aggregator Args: aggregator (Any): Optional aggregator Returns: jobs (list[{ "job": OracleJob, "public_key": PublicKey, "account": JobAccountData }]) Raises: ValueError: Failed to load feed jobs. AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def load_jobs(self, aggregator: Optional[Any] = None) -> Decimal: coder = anchorpy.AccountsCoder(self.program.idl) aggregator = aggregator if aggregator else await self.load_data() job_accounts_raw = await anchorpy.utils.rpc.get_multiple_accounts(self.program.provider.connection, aggregator.job_pubkeys_data[:aggregator.job_pubkeys_size], 10, Confirmed) if not job_accounts_raw: raise ValueError('Failed to load feed jobs.') # Deserialize OracleJob objects from each decoded JobAccountData return [AggregatorLoadedJob(parseOracleJob(coder.decode(job.account.data).data), job.pubkey, coder.decode(job.account.data)) for job in job_accounts_raw] """ Load all job hashes for each job stored in this aggregator Args: aggregator (Any): Optional aggregator Returns: hashes (list[str]): hashes for each job Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def load_hashes(self, aggregator: Optional[Any] = None) -> Decimal: coder = anchorpy.AccountsCoder(self.program.idl) aggregator = aggregator if aggregator else await self.loadData() job_accounts_raw = await anchorpy.utils.rpc.get_multiple_accounts(self.program.provider.connection, aggregator.job_pubkeys_data[:aggregator.job_pubkeys_size]) if not job_accounts_raw: raise ValueError('Failed to load feed jobs.') # get hashes from each decoded JobAccountData return [coder.decode(job.account.data).hash for job in job_accounts_raw] """ Get the size of an AggregatorAccount on chain Returns: int: size of the AggregatorAccount on chain """ def size(self): return self.program.account["AggregatorAccountData"].size """ Create and initialize the AggregatorAccount. Args: program (anchorpy.Program): Switchboard program representation holding connection and IDL params (AggregatorInitParams): init params for the aggregator Returns: AggregatorAccount """ @staticmethod async def create(program: anchorpy.Program, aggregator_init_params: AggregatorInitParams): aggregator_account = aggregator_init_params.keypair or Keypair.generate() authority = aggregator_init_params.authority or aggregator_account.public_key size = program.account["AggregatorAccountData"].size state_account, state_bump = ProgramStateAccount.from_seed(program) state = await state_account.load_data() response = await program.provider.connection.get_minimum_balance_for_rent_exemption(size) lamports = response["result"] zero_decimal = SwitchboardDecimal(0, 0).as_proper_sbd(program) await program.rpc["aggregator_init"]( { "name": aggregator_init_params.name or bytes([0] * 32), "metadata": aggregator_init_params.metadata or bytes([0] * 128), "batch_size": aggregator_init_params.batch_size, "min_oracle_results": aggregator_init_params.min_required_oracle_results, "min_job_results": aggregator_init_params.min_required_job_results, "min_update_delay_seconds": aggregator_init_params.min_update_delay_seconds, "variance_threshold": SwitchboardDecimal.from_decimal(aggregator_init_params.variance_threshold).as_proper_sbd(program) if aggregator_init_params.variance_threshold else zero_decimal, "force_report_period": aggregator_init_params.force_report_period or 0, "expiration": aggregator_init_params.expiration or 0, "state_bump": state_bump, "disable_crank": aggregator_init_params.disable_crank or False, "start_after": aggregator_init_params.start_after or 0, }, ctx=anchorpy.Context( accounts={ "aggregator": aggregator_account.public_key, "authority": authority, "queue": aggregator_init_params.queue_account.public_key, "author_wallet": aggregator_init_params.author_wallet or state.token_vault, "program_state": state_account.public_key }, signers=[aggregator_account], pre_instructions=[ create_account( CreateAccountParams( from_pubkey=program.provider.wallet.public_key, new_account_pubkey=aggregator_account.public_key, lamports=lamports, space=size, program_id=program.program_id ) ) ] ) ) return AggregatorAccount(AccountParams(program=program, keypair=aggregator_account)) """ Create and set a history buffer for the aggregator Args: program (anchorpy.Program): Switchboard program representation holding connection and IDL params (AggregatorSetHistoryBufferParams) Returns: TransactionSignature """ async def set_history_buffer(self, params: AggregatorSetHistoryBufferParams): buffer = Keypair.generate() program = self.program authority = params.authority or self.keypair HISTORY_ROW_SIZE = 28 INSERT_IDX_SIZE = 4 DISCRIMINATOR_SIZE = 8 size = params.size * HISTORY_ROW_SIZE + INSERT_IDX_SIZE + DISCRIMINATOR_SIZE response = await program.provider.connection.get_minimum_balance_for_rent_exemption(size) lamports = response["result"] await program.rpc["aggregator_set_history_buffer"]( ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, "buffer": buffer.public_key }, signers=[authority, buffer], pre_instructions=[ create_account( CreateAccountParams( from_pubkey=program.provider.wallet.public_key, new_account_pubkey=buffer.public_key, space=size, lamports=lamports, program_id=program.program_id ) ) ] ) ) """ Open round on aggregator to get an update Args: program (anchorpy.Program): Switchboard program representation holding connection and IDL params (AggregatorOpenRoundParams) Returns: TransactionSignature """ async def open_round(self, params: AggregatorOpenRoundParams): program = self.program state_account, state_bump = ProgramStateAccount.from_seed(program) queue = await params.oracle_queue_account.load_data() lease_account, lease_bump = LeaseAccount.from_seed( self.program, params.oracle_queue_account, self ) lease = await lease_account.load_data() permission_account, permission_bump = PermissionAccount.from_seed( self.program, queue.authority, params.oracle_queue_account.public_key, self.public_key ) return await program.rpc["aggregator_open_round"]( { "state_bump": state_bump, "lease_bump": lease_bump, "permission_bump": permission_bump, "jitter": params.jitter or 0 }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "lease": lease_account.public_key, "oracle_queue": params.oracle_queue_account.public_key, "queue_authority": queue.authority, "permission": permission_account.public_key, "escrow": lease.escrow, "program_state": state_account.public_key, "payout_wallet": params.payout_wallet, "token_program": TOKEN_PROGRAM_ID, "data_buffer": queue.data_buffer, "mint": (await params.oracle_queue_account.load_mint()).pubkey, }, ) ) """ Set min jobs sets the min jobs parameter. This is a suggestion to oracles of the number of jobs that must resolve for a job to be considered valid. Args: params (AggregatorSetMinJobsParams): parameters pecifying the min jobs that must respond Returns: TransactionSignature """ async def set_min_jobs(self, params: AggregatorSetMinJobsParams): authority = authority or self.keypair return await self.program.rpc['aggregator_set_min_jobs']( { "min_job_results": params.min_job_results }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, }, signers=[authority] ) ) """ Set min oracles sets the min oracles parameter. This will determine how many oracles need to come back with a valid response for a result to be accepted. Args: params (AggregatorSetMinOraclesParams): parameters pecifying the min jobs that must respond Returns: TransactionSignature """ async def set_min_jobs(self, params: AggregatorSetMinJobsParams): authority = authority or self.keypair return await self.program.rpc['aggregator_set_min_jobs']( { "min_job_results": params.min_job_results }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, }, signers=[authority] ) ) """ RPC to add a new job to an aggregtor to be performed on feed updates. Args: job (JobAccount): specifying another job for this aggregator to fulfill on update authority (Keypair | None) Returns: TransactionSignature """ async def add_job(self, job: JobAccount, weight: int = 0, authority: Optional[Keypair] = None) -> TransactionSignature: authority = authority or self.keypair return await self.program.rpc['aggregator_add_job']( { "weight": weight }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, "job": job.public_key }, signers=[authority] ) ) """ RPC Set batch size / the number of oracles that'll respond to updates Args: params (AggregatorSetBatchSizeParams) Returns: TransactionSignature """ async def set_batch_size(self, params: AggregatorSetBatchSizeParams) -> TransactionSignature: authority = authority or self.keypair return await self.program.rpc['aggregator_set_batch_size']( { "batch_size": params.batch_size }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, }, signers=[authority] ) ) """ RPC set variance threshold (only write updates when response is > variance threshold %) Args: params (AggregatorSetVarianceThresholdParams) Returns: TransactionSignature """ async def set_variance_threshold(self, params: AggregatorSetVarianceThresholdParams) -> TransactionSignature: authority = authority or self.keypair return await self.program.rpc['aggregator_set_variance_threshold']( { "variance_threshold": SwitchboardDecimal.from_decimal(params.threshold) }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, }, signers=[authority] ) ) """ RPC set min oracles Args: params (AggregatorSetMinOraclesParams) Returns: TransactionSignature """ async def set_min_oracles(self, params: AggregatorSetMinOraclesParams) -> TransactionSignature: authority = authority or self.keypair return await self.program.rpc['aggregator_set_min_oracles']( { "min_oracle_results": params.min_oracle_results }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, }, signers=[authority] ) ) """ RPC set update interval Args: params (AggregatorSetUpdateIntervalParams) Returns: TransactionSignature """ async def set_update_interval(self, params: AggregatorSetUpdateIntervalParams) -> TransactionSignature: authority = authority or self.keypair return await self.program.rpc['aggregator_set_update_interval']( { "new_interval": params.new_interval }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, }, signers=[authority] ) ) """ Prevent new jobs from being added to the feed. Args: authority (Keypair | None): the current authority keypair Returns: TransactionSignature """ async def lock(self, authority: Optional[Keypair] = None) -> TransactionSignature: authority = authority or self.keypair return await self.program.rpc['aggregator_lock']( ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, }, signers=[authority] ) ) """ Change the aggregator authority Args: new_authority (Keypair): The new authority current_authority (Keypair | None): the current authority keypair Returns: TransactionSignature """ async def set_authority(self, new_authority: Keypair, current_authority: Optional[Keypair] = None) -> TransactionSignature: current_authority = current_authority or self.keypair return await self.program.rpc['aggregator_set_authoirty']( ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "new_authority": new_authority, "authority": current_authority.public_key, }, signers=[current_authority] ) ) """ RPC to add remove job from an aggregtor. Args: job (JobAccount): specifying job to remove authority (Keypair | None) Returns: TransactionSignature """ async def remove_job(self, job: JobAccount, authority: Optional[Keypair] = None) -> TransactionSignature: authority = authority or self.keypair return await self.program.rpc['aggregator_remove_job']( ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, "job": job.public_key }, signers=[authority] ) ) """ Get Index of Oracle in Aggregator Args: oracle_pubkey (PublicKey): Public key belonging to the oracle Returns: int: index of the oracle, -1 if not found """ async def get_oracle_index(self, oracle_pubkey: PublicKey): aggregator = await self.load_data() for i, curr_oracle_pubkey in enumerate(aggregator.current_round.oracle_pubkeys_data): if curr_oracle_pubkey == oracle_pubkey: return i return -1 """ Save Aggregator result Args: aggregator (Any): Aggregator data oracle_account (OracleAccount) params (AggregatorSaveResultParams) Returns: TransactionSignature """ async def remove_job(self, aggregator: Any, oracle_account: OracleAccount, params: AggregatorSaveResultParams) -> TransactionSignature: return await self.program.provider.send( tx=( await self.save_result_txn( aggregator, oracle_account, params ) ) ) """ RPC call for an oracle to save a result to an aggregator round. Args: aggregator (Any): Aggregator data oracle_account (OracleAccount) params (AggregatorSaveResultParams) Returns: TransactionSignature """ async def save_result_txn(self, aggregator: Any, oracle_account: OracleAccount, params: AggregatorSaveResultParams): payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key) remaining_accounts: list[PublicKey] = [] for i in range(aggregator.oracle_request_batch_size): remaining_accounts.append(aggregator.current_round.oracle_pubkeys_data[i]) for oracle in params.oracles: remaining_accounts.push(oracle.token_account) queue_pubkey = aggregator.queue_pubkey queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=queue_pubkey)) lease_account, lease_bump = LeaseAccount.from_seed( self.program, queue_account, self ) escrow = get_associated_token_address(lease_account.public_key, params.token_mint) feed_permission_account, feed_permission_bump = PermissionAccount.from_seed( self.program, params.queue_authority, queue_account.public_key, self.public_key ) oracle_permission_account, oracle_permission_bump = PermissionAccount.from_seed( self.program, params.queue_authority, queue_account.public_key, oracle_account.public_key ) program_state_account, state_bump = ProgramStateAccount.from_seed(self.program) digest = await self.produce_job_hash(params.jobs).digest() history_buffer = aggregator.history_buffer if history_buffer == PublicKey('11111111111111111111111111111111'): history_buffer = self.public_key return self.program.transaction['aggregator_save_result']( { "oracle_idx": params.oracle_idx, "error": params.error, "value": SwitchboardDecimal.from_decimal(params.value).as_proper_sbd(self.program), "jobs_checksum": digest, "min_response": SwitchboardDecimal.from_decimal(params.min_response).as_proper_sbd(self.program), "max_response": SwitchboardDecimal.from_decimal(params.max_response).as_proper_sbd(self.program), "feed_permission_bump": feed_permission_bump, "oracle_permission_bump": oracle_permission_bump, "lease_bump": lease_bump, "state_bump": state_bump }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "oracle": oracle_account.public_key, "oracle_authority": payer_keypair.public_key, "oracle_queue": queue_account.public_key, "feed_permission": feed_permission_account.public_key, "oracle_permission": oracle_permission_account.public_key, "lease": lease_account.public_key, "escrow": escrow, "token_program": TOKEN_PROGRAM_ID, "program_state": program_state_account.public_key, "history_buffer": history_buffer, "mint": params.token_mint }, remaining_accounts=[{"is_signer": False, "is_writable": True, "pubkey": pubkey} for pubkey in remaining_accounts] ) )
AggregatorAccount is the wrapper for an Aggregator, the structure for that keeps aggregated feed data / metadata.
Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This aggregator's public key keypair (Keypair | None): this aggregator's keypair
View Source
def __init__(self, params: AccountParams): if params.public_key is None and params.keypair is None: raise ValueError('User must provide either a publicKey or keypair for account use.') if params.keypair and params.public_key and params.keypair.public_key != params.public_key: raise ValueError('User must provide either a publicKey or keypair for account use.') self.program = params.program self.public_key = params.keypair.public_key if params.keypair else params.public_key self.keypair = params.keypair
Get name of an aggregator.
Args: aggregator (Any): Anchor-loaded aggregator
Returns: name string of the aggregator
View Source
@staticmethod def get_name(aggregator: Any) -> str: return ''.join(map(chr, *aggregator.name)).decode("utf-8").replace(u"\u0000", "*").encode("utf-8")
View Source
async def load_data(self): return await AggregatorAccountData.fetch(self.program.provider.connection, self.public_key)
View Source
async def load_history(self, aggregator: Any = None) -> Any: # if aggregator data passed in - use that, else load this aggregator aggregator = aggregator if aggregator else await self.load_data() # Compare History Buffer to default public key (zeroed out) if (aggregator.history_buffer == 11111111111111111111111111111111): return [] # Fixed AggregatorHistoryRow size ROW_SIZE = 28 # Get account data info = await self.program.provider.connection.get_account_info(aggregator.history_buffer) buffer = info.data if info else [] if not buffer or buffer.length < 12: return [] # Read UInt32 as a Little Endian val, starting at position 8 insert_idx: int = struct.unpack_from("<L", buffer, 8)[0] * ROW_SIZE front = [] tail = [] if not isinstance(buffer, list): return [] for i in range(13, buffer.length, ROW_SIZE): if i + ROW_SIZE > buffer.length: break row = AggregatorHistoryRow.from_buffer(buffer) if row.timestamp == 0: break if i <= insert_idx: tail.append(row) else: front.append(row) return front.extend(tail)
View Source
async def get_latest_value(self, aggregator: Optional[Any] = None) -> Decimal: aggregator = aggregator if aggregator else await self.load_data() if hasattr(aggregator, 'latest_confirmed_round') and aggregator.latest_confirmed_round.num_success == 0: raise ValueError('Aggregator currently holds no value.') return SwitchboardDecimal.sbd_to_decimal(aggregator.latest_confirmed_round.result)
View Source
async def get_latest_feed_timestamp(self, aggregator: Optional[Any] = None) -> Decimal: aggregator = aggregator if aggregator else await self.load_data() if hasattr(aggregator, 'latest_confirmed_round') and aggregator.latest_confirmed_round.num_success == 0: raise ValueError('Aggregator currently holds no value.') return aggregator.latest_confirmed_round.round_open_timestamp
View Source
@staticmethod def should_report_value(value: Decimal, aggregator: Optional[Any] = None) -> bool: if aggregator.latestConfirmedRound and aggregator.latest_confirmed_round.num_success == 0: return True timestamp = round(int(time.time()) / 1000) if aggregator.start_after > timestamp: return False variance_threshold = SwitchboardDecimal.sbd_to_decimal(aggregator.variance_threshold) latest_result = SwitchboardDecimal.sbd_to_decimal(aggregator.latest_confirmed_round.result) force_report_period = aggregator.force_report_period last_timestamp = aggregator.latest_confirmed_round.round_open_timestamp if last_timestamp + force_report_period < timestamp: return True diff = latest_result / value if abs(diff) > 1: diff = value / latest_result if diff < 0: return True change_percentage = 1 - diff * 100 return change_percentage > variance_threshold
View Source
async def get_confirmed_round_results(self, aggregator: Optional[Any] = None) -> Decimal: aggregator = aggregator if aggregator else await self.load_data() if hasattr(aggregator, 'latest_confirmed_round') and aggregator.latest_confirmed_round.num_success == 0: raise ValueError('Aggregator currently holds no value.') results: list[Any] = [] for i in range(aggregator.oracle_request_batch_size): if aggregator.latest_confirmed_round.medians_filfilled[i]: results.append({ "oracle_account": OracleAccount(AccountParams(program=self.program, public_key=aggregator.latest_confirmed_round.oracle_pubkeys_data[i])), "value": SwitchboardDecimal.sbd_to_decimal(aggregator.latest_confirmed_round.medians_data[i]) }) return results
View Source
@staticmethod def produce_job_hash(jobs: list[OracleJob]): hash = hashlib.sha256() for job in jobs: job_hasher = hashlib.sha256() job_hasher.update(job.SerializeToString()) hash.update(job_hasher.digest()) return hash
View Source
async def load_jobs(self, aggregator: Optional[Any] = None) -> Decimal: coder = anchorpy.AccountsCoder(self.program.idl) aggregator = aggregator if aggregator else await self.load_data() job_accounts_raw = await anchorpy.utils.rpc.get_multiple_accounts(self.program.provider.connection, aggregator.job_pubkeys_data[:aggregator.job_pubkeys_size], 10, Confirmed) if not job_accounts_raw: raise ValueError('Failed to load feed jobs.') # Deserialize OracleJob objects from each decoded JobAccountData return [AggregatorLoadedJob(parseOracleJob(coder.decode(job.account.data).data), job.pubkey, coder.decode(job.account.data)) for job in job_accounts_raw]
View Source
async def load_hashes(self, aggregator: Optional[Any] = None) -> Decimal: coder = anchorpy.AccountsCoder(self.program.idl) aggregator = aggregator if aggregator else await self.loadData() job_accounts_raw = await anchorpy.utils.rpc.get_multiple_accounts(self.program.provider.connection, aggregator.job_pubkeys_data[:aggregator.job_pubkeys_size]) if not job_accounts_raw: raise ValueError('Failed to load feed jobs.') # get hashes from each decoded JobAccountData return [coder.decode(job.account.data).hash for job in job_accounts_raw]
View Source
def size(self): return self.program.account["AggregatorAccountData"].size
View Source
@staticmethod async def create(program: anchorpy.Program, aggregator_init_params: AggregatorInitParams): aggregator_account = aggregator_init_params.keypair or Keypair.generate() authority = aggregator_init_params.authority or aggregator_account.public_key size = program.account["AggregatorAccountData"].size state_account, state_bump = ProgramStateAccount.from_seed(program) state = await state_account.load_data() response = await program.provider.connection.get_minimum_balance_for_rent_exemption(size) lamports = response["result"] zero_decimal = SwitchboardDecimal(0, 0).as_proper_sbd(program) await program.rpc["aggregator_init"]( { "name": aggregator_init_params.name or bytes([0] * 32), "metadata": aggregator_init_params.metadata or bytes([0] * 128), "batch_size": aggregator_init_params.batch_size, "min_oracle_results": aggregator_init_params.min_required_oracle_results, "min_job_results": aggregator_init_params.min_required_job_results, "min_update_delay_seconds": aggregator_init_params.min_update_delay_seconds, "variance_threshold": SwitchboardDecimal.from_decimal(aggregator_init_params.variance_threshold).as_proper_sbd(program) if aggregator_init_params.variance_threshold else zero_decimal, "force_report_period": aggregator_init_params.force_report_period or 0, "expiration": aggregator_init_params.expiration or 0, "state_bump": state_bump, "disable_crank": aggregator_init_params.disable_crank or False, "start_after": aggregator_init_params.start_after or 0, }, ctx=anchorpy.Context( accounts={ "aggregator": aggregator_account.public_key, "authority": authority, "queue": aggregator_init_params.queue_account.public_key, "author_wallet": aggregator_init_params.author_wallet or state.token_vault, "program_state": state_account.public_key }, signers=[aggregator_account], pre_instructions=[ create_account( CreateAccountParams( from_pubkey=program.provider.wallet.public_key, new_account_pubkey=aggregator_account.public_key, lamports=lamports, space=size, program_id=program.program_id ) ) ] ) ) return AggregatorAccount(AccountParams(program=program, keypair=aggregator_account))
View Source
async def set_history_buffer(self, params: AggregatorSetHistoryBufferParams): buffer = Keypair.generate() program = self.program authority = params.authority or self.keypair HISTORY_ROW_SIZE = 28 INSERT_IDX_SIZE = 4 DISCRIMINATOR_SIZE = 8 size = params.size * HISTORY_ROW_SIZE + INSERT_IDX_SIZE + DISCRIMINATOR_SIZE response = await program.provider.connection.get_minimum_balance_for_rent_exemption(size) lamports = response["result"] await program.rpc["aggregator_set_history_buffer"]( ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, "buffer": buffer.public_key }, signers=[authority, buffer], pre_instructions=[ create_account( CreateAccountParams( from_pubkey=program.provider.wallet.public_key, new_account_pubkey=buffer.public_key, space=size, lamports=lamports, program_id=program.program_id ) ) ] ) )
View Source
async def open_round(self, params: AggregatorOpenRoundParams): program = self.program state_account, state_bump = ProgramStateAccount.from_seed(program) queue = await params.oracle_queue_account.load_data() lease_account, lease_bump = LeaseAccount.from_seed( self.program, params.oracle_queue_account, self ) lease = await lease_account.load_data() permission_account, permission_bump = PermissionAccount.from_seed( self.program, queue.authority, params.oracle_queue_account.public_key, self.public_key ) return await program.rpc["aggregator_open_round"]( { "state_bump": state_bump, "lease_bump": lease_bump, "permission_bump": permission_bump, "jitter": params.jitter or 0 }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "lease": lease_account.public_key, "oracle_queue": params.oracle_queue_account.public_key, "queue_authority": queue.authority, "permission": permission_account.public_key, "escrow": lease.escrow, "program_state": state_account.public_key, "payout_wallet": params.payout_wallet, "token_program": TOKEN_PROGRAM_ID, "data_buffer": queue.data_buffer, "mint": (await params.oracle_queue_account.load_mint()).pubkey, }, ) )
View Source
async def set_min_jobs(self, params: AggregatorSetMinJobsParams): authority = authority or self.keypair return await self.program.rpc['aggregator_set_min_jobs']( { "min_job_results": params.min_job_results }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, }, signers=[authority] ) )
View Source
async def add_job(self, job: JobAccount, weight: int = 0, authority: Optional[Keypair] = None) -> TransactionSignature: authority = authority or self.keypair return await self.program.rpc['aggregator_add_job']( { "weight": weight }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, "job": job.public_key }, signers=[authority] ) )
View Source
async def set_batch_size(self, params: AggregatorSetBatchSizeParams) -> TransactionSignature: authority = authority or self.keypair return await self.program.rpc['aggregator_set_batch_size']( { "batch_size": params.batch_size }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, }, signers=[authority] ) )
View Source
async def set_variance_threshold(self, params: AggregatorSetVarianceThresholdParams) -> TransactionSignature: authority = authority or self.keypair return await self.program.rpc['aggregator_set_variance_threshold']( { "variance_threshold": SwitchboardDecimal.from_decimal(params.threshold) }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, }, signers=[authority] ) )
View Source
async def set_min_oracles(self, params: AggregatorSetMinOraclesParams) -> TransactionSignature: authority = authority or self.keypair return await self.program.rpc['aggregator_set_min_oracles']( { "min_oracle_results": params.min_oracle_results }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, }, signers=[authority] ) )
View Source
async def set_update_interval(self, params: AggregatorSetUpdateIntervalParams) -> TransactionSignature: authority = authority or self.keypair return await self.program.rpc['aggregator_set_update_interval']( { "new_interval": params.new_interval }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, }, signers=[authority] ) )
View Source
async def lock(self, authority: Optional[Keypair] = None) -> TransactionSignature: authority = authority or self.keypair return await self.program.rpc['aggregator_lock']( ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "authority": authority.public_key, }, signers=[authority] ) )
View Source
async def remove_job(self, aggregator: Any, oracle_account: OracleAccount, params: AggregatorSaveResultParams) -> TransactionSignature: return await self.program.provider.send( tx=( await self.save_result_txn( aggregator, oracle_account, params ) ) )
View Source
async def get_oracle_index(self, oracle_pubkey: PublicKey): aggregator = await self.load_data() for i, curr_oracle_pubkey in enumerate(aggregator.current_round.oracle_pubkeys_data): if curr_oracle_pubkey == oracle_pubkey: return i return -1
View Source
async def save_result_txn(self, aggregator: Any, oracle_account: OracleAccount, params: AggregatorSaveResultParams): payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key) remaining_accounts: list[PublicKey] = [] for i in range(aggregator.oracle_request_batch_size): remaining_accounts.append(aggregator.current_round.oracle_pubkeys_data[i]) for oracle in params.oracles: remaining_accounts.push(oracle.token_account) queue_pubkey = aggregator.queue_pubkey queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=queue_pubkey)) lease_account, lease_bump = LeaseAccount.from_seed( self.program, queue_account, self ) escrow = get_associated_token_address(lease_account.public_key, params.token_mint) feed_permission_account, feed_permission_bump = PermissionAccount.from_seed( self.program, params.queue_authority, queue_account.public_key, self.public_key ) oracle_permission_account, oracle_permission_bump = PermissionAccount.from_seed( self.program, params.queue_authority, queue_account.public_key, oracle_account.public_key ) program_state_account, state_bump = ProgramStateAccount.from_seed(self.program) digest = await self.produce_job_hash(params.jobs).digest() history_buffer = aggregator.history_buffer if history_buffer == PublicKey('11111111111111111111111111111111'): history_buffer = self.public_key return self.program.transaction['aggregator_save_result']( { "oracle_idx": params.oracle_idx, "error": params.error, "value": SwitchboardDecimal.from_decimal(params.value).as_proper_sbd(self.program), "jobs_checksum": digest, "min_response": SwitchboardDecimal.from_decimal(params.min_response).as_proper_sbd(self.program), "max_response": SwitchboardDecimal.from_decimal(params.max_response).as_proper_sbd(self.program), "feed_permission_bump": feed_permission_bump, "oracle_permission_bump": oracle_permission_bump, "lease_bump": lease_bump, "state_bump": state_bump }, ctx=anchorpy.Context( accounts={ "aggregator": self.public_key, "oracle": oracle_account.public_key, "oracle_authority": payer_keypair.public_key, "oracle_queue": queue_account.public_key, "feed_permission": feed_permission_account.public_key, "oracle_permission": oracle_permission_account.public_key, "lease": lease_account.public_key, "escrow": escrow, "token_program": TOKEN_PROGRAM_ID, "program_state": program_state_account.public_key, "history_buffer": history_buffer, "mint": params.token_mint }, remaining_accounts=[{"is_signer": False, "is_writable": True, "pubkey": pubkey} for pubkey in remaining_accounts] ) )
View Source
@dataclass class AggregatorHistoryRow: """AggregatorHistoryRow is a wrapper for the row structure of elements in the aggregator history buffer. Attributes: timestamp (int): timestamp of the aggregator result value (Decimal): Aggregator value at the timestamp """ timestamp: int value: Decimal """ Generate an AggregatorHistoryRow from a retrieved buffer representation Args: buf (list): Anchor-loaded buffer representation of AggregatorHistoryRow Returns: AggregatorHistoryRow """ @staticmethod def from_buffer(buf: bytes): timestamp: int = struct.unpack_from("<L", buf[:8])[0] mantissa: int = struct.unpack_from("<L", buf[8:24])[0] scale: int = struct.unpack_from("<L", buf, 24)[0] decimal = SwitchboardDecimal.sbd_to_decimal({"mantissa": mantissa, "scale": scale}) res = AggregatorHistoryRow(timestamp, decimal) return res
AggregatorHistoryRow is a wrapper for the row structure of elements in the aggregator history buffer.
Attributes: timestamp (int): timestamp of the aggregator result value (Decimal): Aggregator value at the timestamp
Generate an AggregatorHistoryRow from a retrieved buffer representation
Args: buf (list): Anchor-loaded buffer representation of AggregatorHistoryRow
Returns: AggregatorHistoryRow
View Source
@staticmethod def from_buffer(buf: bytes): timestamp: int = struct.unpack_from("<L", buf[:8])[0] mantissa: int = struct.unpack_from("<L", buf[8:24])[0] scale: int = struct.unpack_from("<L", buf, 24)[0] decimal = SwitchboardDecimal.sbd_to_decimal({"mantissa": mantissa, "scale": scale}) res = AggregatorHistoryRow(timestamp, decimal) return res
View Source
@dataclass class AggregatorInitParams: """Number of oracles to request on aggregator update.""" batch_size: int """Minimum number of oracle responses required before a round is validated.""" min_required_oracle_results: int """Minimum number of seconds required between aggregator rounds.""" min_required_job_results: int """Minimum number of seconds required between aggregator rounds.""" min_update_delay_seconds: int """The queue to which this aggregator will be linked""" queue_account: OracleQueueAccount """Name of the aggregator to store on-chain.""" name: bytes = None """Metadata of the aggregator to store on-chain.""" metadata: bytes = None """unix_timestamp for which no feed update will occur before.""" start_after: int = None """ Change percentage required between a previous round and the current round. If variance percentage is not met, reject new oracle responses. """ variance_threshold: Decimal = None """ Number of seconds for which, even if the variance threshold is not passed, accept new responses from oracles. """ force_report_period: int = None """ unix_timestamp after which funds may be withdrawn from the aggregator. null/undefined/0 means the feed has no expiration. """ expiration: int = None """ An optional wallet for receiving kickbacks from job usage in feeds. Defaults to token vault. """ keypair: Keypair = None """ An optional wallet for receiving kickbacks from job usage in feeds. Defaults to token vault. """ author_wallet: PublicKey = None """ If included, this keypair will be the aggregator authority rather than the aggregator keypair. """ authority: PublicKey = None """Disable automatic updates""" disable_crank: bool = None
Number of oracles to request on aggregator update.
Minimum number of oracle responses required before a round is validated.
Minimum number of seconds required between aggregator rounds.
Minimum number of seconds required between aggregator rounds.
The queue to which this aggregator will be linked
Name of the aggregator to store on-chain.
Metadata of the aggregator to store on-chain.
unix_timestamp for which no feed update will occur before.
Change percentage required between a previous round and the current round. If variance percentage is not met, reject new oracle responses.
Number of seconds for which, even if the variance threshold is not passed, accept new responses from oracles.
unix_timestamp after which funds may be withdrawn from the aggregator. null/undefined/0 means the feed has no expiration.
An optional wallet for receiving kickbacks from job usage in feeds. Defaults to token vault.
An optional wallet for receiving kickbacks from job usage in feeds. Defaults to token vault.
View Source
@dataclass class AggregatorOpenRoundParams: """The oracle queue from which oracles are assigned this update.""" oracle_queue_account: OracleQueueAccount """The token wallet which will receive rewards for calling update on this feed.""" payout_wallet: PublicKey """ Data feeds on a crank are ordered by their next available update time with some level of jitter to mitigate oracles being assigned to the same update request upon each iteration of the queue, which makes them susceptible to a malicous oracle. """ jitter: int = None
The oracle queue from which oracles are assigned this update.
The token wallet which will receive rewards for calling update on this feed.
Data feeds on a crank are ordered by their next available update time with some level of jitter to mitigate oracles being assigned to the same update request upon each iteration of the queue, which makes them susceptible to a malicous oracle.
View Source
@dataclass class AggregatorSaveResultParams: """Index in the list of oracles in the aggregator assigned to this round update.""" oracle_idx: int """Reports that an error occured and the oracle could not send a value.""" error: bool """Value the oracle is responding with for this update.""" value: Decimal """ The minimum value this oracle has seen this round for the jobs listed in the aggregator. """ min_response: Decimal """ The maximum value this oracle has seen this round for the jobs listed in the aggregator. """ max_response: Decimal """List of OracleJobs that were performed to produce this result""" jobs: list[OracleJob] """Authority of the queue the aggregator is attached to""" queue_authority: PublicKey """Program token mint""" token_mint: PublicKey """List of parsed oracles""" oracles: list[Any]
Index in the list of oracles in the aggregator assigned to this round update.
Reports that an error occured and the oracle could not send a value.
Value the oracle is responding with for this update.
The minimum value this oracle has seen this round for the jobs listed in the aggregator.
The maximum value this oracle has seen this round for the jobs listed in the aggregator.
List of OracleJobs that were performed to produce this result
Authority of the queue the aggregator is attached to
List of parsed oracles
View Source
@dataclass class AggregatorSetHistoryBufferParams: """Number of elements for the history buffer to fit""" size: int """Authority keypair for the aggregator""" authority: Keypair = None
Number of elements for the history buffer to fit
Authority keypair for the aggregator
View Source
class CrankAccount: """ A Switchboard account representing a crank of aggregators ordered by next update time. Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This crank's public key keypair (Keypair | None): this crank's keypair """ def __init__(self, params: AccountParams): if params.public_key is None and params.keypair is None: raise ValueError('User must provide either a publicKey or keypair for account use.') if params.keypair and params.public_key and params.keypair.public_key != params.public_key: raise ValueError('User must provide either a publicKey or keypair for account use.') self.program = params.program self.public_key = params.keypair.public_key if params.keypair else params.public_key self.keypair = params.keypair """ Get the size of an CrankAccount on chain Args: Returns: int: size of the CrankAccount type on chain """ def size(self): return self.program.account["CrankAccountData"].size """ Load and parse CrankAccount data based on the program IDL Args: Returns: CrankAccount Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def load_data(self): return await CrankAccountData.fetch(self.program.provider.connection, self.public_key) """ Create and initialize the CrankAccount. Args: program (anchor.Program): Switchboard program representation holding connection and IDL. params (CrankInitParams) Returns: CrankAccount """ @staticmethod async def create(program: anchorpy.Program, params: CrankInitParams): crank_account = Keypair.generate() buffer = Keypair.generate() size = program.account["CrankAccountData"].size max_rows = params.max_rows or 500 crank_size = max_rows * 40 + 8 response = await program.provider.connection.get_minimum_balance_for_rent_exemption(crank_size) lamports = response["result"] await program.rpc["crank_init"]( { "name": params.name or bytes([0] * 32), "metadata": params.metadata or bytes([0] * 128), "crank_size": max_rows }, ctx=anchorpy.Context( accounts={ "crank": crank_account.public_key, "queue": params.queue_account.public_key, "buffer": buffer.public_key, "system_program": system_program.SYS_PROGRAM_ID, "payer": program.provider.wallet.public_key }, signers=[crank_account, buffer], pre_instructions=[ create_account( CreateAccountParams( from_pubkey=program.provider.wallet.public_key, new_account_pubkey=buffer.public_key, lamports=lamports, space=size, program_id=program.program_id ) ) ] ) ) return CrankAccount(AccountParams(program=program, keypair=crank_account)) """ Pushes a new aggregator onto the crank Args: params (CrankPushParams): aggregator and related data Returns: TransactionSignature """ async def push(self, params: CrankPushParams): aggregator_account: AggregatorAccount = params.aggregator_account crank = await self.load_data() queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=crank.queue_pubkey)) queue = await queue_account.load_data() queue_authority = queue.authority lease_account, lease_bump = LeaseAccount.from_seed(self.program, queue_account, aggregator_account) lease: Any = None try: lease = await lease_account.load_data() except Exception: raise ValueError('A requested lease pda account has not been initialized.') permission_account, permission_bump = PermissionAccount.from_seed( self.program, queue_authority, queue_account.public_key, aggregator_account.public_key ) try: await lease_account.load_data() except Exception: raise ValueError('A requested permission pda account has not been initialized.') program_state_account, state_bump = ProgramStateAccount.from_seed(self.program) return await self.program.rpc["crank_push"]( { "state_bump": state_bump, "permission_bump": permission_bump }, ctx=anchorpy.Context( accounts={ "crank": self.public_key, "aggregator": aggregator_account.public_key, "oracle_queue": queue_account.public_key, "queue_authority": queue_authority, "permission": permission_account.public_key, "lease": lease_account.public_key, "escrow": lease.escrow, "program_state": program_state_account.public_key, "data_buffer": crank.data_buffer } ) ) """ Pops a tx from the crank. Args: params (CrankPopParams) Returns: TransactionSignature """ async def pop_txn(self, params: CrankPopParams): fail_open_on_account_mismatch = params.fail_open_on_mismatch or False next = params.ready_pubkeys or await self.peak_next_ready(5) if len(next) == 0: raise ValueError('Crank is not ready to be turned') remaining_accounts: list[PublicKey] = [] lease_bumps_map: Dict[str, int] = {} permission_bumps_map: Dict[str, int] = {} queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=params.queue_pubkey)) for row in next: aggregator_account = AggregatorAccount(AccountParams(program=self.program, public_key=row)) lease_account, lease_bump = LeaseAccount.from_seed( self.program, queue_account, aggregator_account ) permission_account, permission_bump = PermissionAccount.from_seed( self.program, params.queue_authority, params.queue_pubkey, row ) escrow = get_associated_token_address( lease_account.public_key, params.token_mint ) remaining_accounts.append(aggregator_account.public_key) remaining_accounts.append(lease_account.public_key) remaining_accounts.append(escrow) remaining_accounts.append(permission_account.public_key) lease_bumps_map[row.to_base58()] = lease_bump permission_bumps_map[row.to_base58()] = permission_bump remaining_accounts.sort(key=lambda key : bytes(key)) crank = params.crank queue = params.queue lease_bumps: list[int] = [] permission_bumps: list[int] = [] for key in remaining_accounts: lease_bumps.append(lease_bumps_map.get(key.to_base58()) or 0) permission_bumps.append(permission_bumps_map.get(key.to_base58()) or 0) program_state_account, state_bump = ProgramStateAccount.from_seed(self.program) payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key) return self.program.transaction["crank_pop"]( { "state_bump": state_bump, "lease_bumps": bytes(lease_bumps), "permission_bumps": bytes(permission_bumps), "nonce": params.nonce or None, "fail_open_on_account_mismatch": fail_open_on_account_mismatch }, ctx=anchorpy.Context( accounts={ "crank": self.public_key, "oracle_queue": params.queue_pubkey, "queue_authority": params.queue_authority, "program_state": program_state_account.public_key, "payout_wallet": params.payout_wallet, "token_program": TOKEN_PROGRAM_ID, "crank_data_buffer": crank.data_buffer, "queue_data_buffer": queue.data_buffer }, remaining_accounts=[{ "is_signer": False, "is_writable": True, "pubkey": pubkey } for pubkey in remaining_accounts], signers=[payer_keypair] ) ) """ Pops an aggregator from the crank Args: params (CrankPopParams) Returns: TransactionSignature """ async def pop(self, params: CrankPopParams): payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key) txn = await self.pop_txn(params) return await self.program.provider.connection.send_transaction(txn, [payer_keypair]) """ Get an array of the next aggregator pubkeys to be popped from the crank, limited by n Args: n (int): limit of pubkeys to return Returns: list[CrankRow]: Pubkey list of Aggregators and next timestamp to be popped, ordered by timestamp """ async def peak_next_with_time(self, n: int): crank = await self.load_data() # get list slice of length pq_size pq_data: list[CrankRow] = crank.pq_data[:crank.pq_size] # sort by CrankRow next timestamp pq_data.sort(key=lambda crank_row: crank_row.next_timestamp) # return items return pq_data[:n] """ Get an array of the next readily updateable aggregator pubkeys to be popped from the crank, limited by n Args: n (Optional[int]): limit of pubkeys to return Returns: list[PublicKey]: Pubkey list of Aggregators and next timestamp to be popped, ordered by timestamp """ async def peak_next_ready(self, n: Optional[int] = None): now = math.floor(time.time()) crank = await self.load_data() pq_data: list[CrankRow] = crank.pq_data[:crank.pq_size] key = lambda crank_row: crank_row.next_timestamp return [item.pubkey for item in list(filter(lambda item: now >= item.next_timestamp, pq_data)).sort(key=key)[:(n or len(pq_data))]] """ Get an array of the next aggregator pubkeys to be popped from the crank, limited by n Args: n (int): limit of pubkeys to return Returns: list[PublicKey]: Pubkey list of Aggregators and next timestamp to be popped, ordered by timestamp """ async def peak_next(self, n: int): crank = await self.load_data() pq_data: list[CrankRow] = crank.pq_data[:crank.pq_size] pq_data.sort(key=lambda crank_row: crank_row.next_timestamp) return [item.pubkey for item in pq_data[:n]]
A Switchboard account representing a crank of aggregators ordered by next update time.
Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This crank's public key keypair (Keypair | None): this crank's keypair
View Source
def __init__(self, params: AccountParams): if params.public_key is None and params.keypair is None: raise ValueError('User must provide either a publicKey or keypair for account use.') if params.keypair and params.public_key and params.keypair.public_key != params.public_key: raise ValueError('User must provide either a publicKey or keypair for account use.') self.program = params.program self.public_key = params.keypair.public_key if params.keypair else params.public_key self.keypair = params.keypair
Get the size of an CrankAccount on chain
Args:
Returns: int: size of the CrankAccount type on chain
View Source
def size(self): return self.program.account["CrankAccountData"].size
View Source
async def load_data(self): return await CrankAccountData.fetch(self.program.provider.connection, self.public_key)
View Source
@staticmethod async def create(program: anchorpy.Program, params: CrankInitParams): crank_account = Keypair.generate() buffer = Keypair.generate() size = program.account["CrankAccountData"].size max_rows = params.max_rows or 500 crank_size = max_rows * 40 + 8 response = await program.provider.connection.get_minimum_balance_for_rent_exemption(crank_size) lamports = response["result"] await program.rpc["crank_init"]( { "name": params.name or bytes([0] * 32), "metadata": params.metadata or bytes([0] * 128), "crank_size": max_rows }, ctx=anchorpy.Context( accounts={ "crank": crank_account.public_key, "queue": params.queue_account.public_key, "buffer": buffer.public_key, "system_program": system_program.SYS_PROGRAM_ID, "payer": program.provider.wallet.public_key }, signers=[crank_account, buffer], pre_instructions=[ create_account( CreateAccountParams( from_pubkey=program.provider.wallet.public_key, new_account_pubkey=buffer.public_key, lamports=lamports, space=size, program_id=program.program_id ) ) ] ) ) return CrankAccount(AccountParams(program=program, keypair=crank_account))
View Source
async def push(self, params: CrankPushParams): aggregator_account: AggregatorAccount = params.aggregator_account crank = await self.load_data() queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=crank.queue_pubkey)) queue = await queue_account.load_data() queue_authority = queue.authority lease_account, lease_bump = LeaseAccount.from_seed(self.program, queue_account, aggregator_account) lease: Any = None try: lease = await lease_account.load_data() except Exception: raise ValueError('A requested lease pda account has not been initialized.') permission_account, permission_bump = PermissionAccount.from_seed( self.program, queue_authority, queue_account.public_key, aggregator_account.public_key ) try: await lease_account.load_data() except Exception: raise ValueError('A requested permission pda account has not been initialized.') program_state_account, state_bump = ProgramStateAccount.from_seed(self.program) return await self.program.rpc["crank_push"]( { "state_bump": state_bump, "permission_bump": permission_bump }, ctx=anchorpy.Context( accounts={ "crank": self.public_key, "aggregator": aggregator_account.public_key, "oracle_queue": queue_account.public_key, "queue_authority": queue_authority, "permission": permission_account.public_key, "lease": lease_account.public_key, "escrow": lease.escrow, "program_state": program_state_account.public_key, "data_buffer": crank.data_buffer } ) )
View Source
async def pop_txn(self, params: CrankPopParams): fail_open_on_account_mismatch = params.fail_open_on_mismatch or False next = params.ready_pubkeys or await self.peak_next_ready(5) if len(next) == 0: raise ValueError('Crank is not ready to be turned') remaining_accounts: list[PublicKey] = [] lease_bumps_map: Dict[str, int] = {} permission_bumps_map: Dict[str, int] = {} queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=params.queue_pubkey)) for row in next: aggregator_account = AggregatorAccount(AccountParams(program=self.program, public_key=row)) lease_account, lease_bump = LeaseAccount.from_seed( self.program, queue_account, aggregator_account ) permission_account, permission_bump = PermissionAccount.from_seed( self.program, params.queue_authority, params.queue_pubkey, row ) escrow = get_associated_token_address( lease_account.public_key, params.token_mint ) remaining_accounts.append(aggregator_account.public_key) remaining_accounts.append(lease_account.public_key) remaining_accounts.append(escrow) remaining_accounts.append(permission_account.public_key) lease_bumps_map[row.to_base58()] = lease_bump permission_bumps_map[row.to_base58()] = permission_bump remaining_accounts.sort(key=lambda key : bytes(key)) crank = params.crank queue = params.queue lease_bumps: list[int] = [] permission_bumps: list[int] = [] for key in remaining_accounts: lease_bumps.append(lease_bumps_map.get(key.to_base58()) or 0) permission_bumps.append(permission_bumps_map.get(key.to_base58()) or 0) program_state_account, state_bump = ProgramStateAccount.from_seed(self.program) payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key) return self.program.transaction["crank_pop"]( { "state_bump": state_bump, "lease_bumps": bytes(lease_bumps), "permission_bumps": bytes(permission_bumps), "nonce": params.nonce or None, "fail_open_on_account_mismatch": fail_open_on_account_mismatch }, ctx=anchorpy.Context( accounts={ "crank": self.public_key, "oracle_queue": params.queue_pubkey, "queue_authority": params.queue_authority, "program_state": program_state_account.public_key, "payout_wallet": params.payout_wallet, "token_program": TOKEN_PROGRAM_ID, "crank_data_buffer": crank.data_buffer, "queue_data_buffer": queue.data_buffer }, remaining_accounts=[{ "is_signer": False, "is_writable": True, "pubkey": pubkey } for pubkey in remaining_accounts], signers=[payer_keypair] ) )
View Source
async def pop(self, params: CrankPopParams): payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key) txn = await self.pop_txn(params) return await self.program.provider.connection.send_transaction(txn, [payer_keypair])
View Source
async def peak_next_with_time(self, n: int): crank = await self.load_data() # get list slice of length pq_size pq_data: list[CrankRow] = crank.pq_data[:crank.pq_size] # sort by CrankRow next timestamp pq_data.sort(key=lambda crank_row: crank_row.next_timestamp) # return items return pq_data[:n]
View Source
async def peak_next_ready(self, n: Optional[int] = None): now = math.floor(time.time()) crank = await self.load_data() pq_data: list[CrankRow] = crank.pq_data[:crank.pq_size] key = lambda crank_row: crank_row.next_timestamp return [item.pubkey for item in list(filter(lambda item: now >= item.next_timestamp, pq_data)).sort(key=key)[:(n or len(pq_data))]]
View Source
async def peak_next(self, n: int): crank = await self.load_data() pq_data: list[CrankRow] = crank.pq_data[:crank.pq_size] pq_data.sort(key=lambda crank_row: crank_row.next_timestamp) return [item.pubkey for item in pq_data[:n]]
View Source
@dataclass class CrankPopParams: """Specifies the wallet to reward for turning the crank.""" payout_wallet: PublicKey """The pubkey of the linked oracle queue.""" queue_pubkey: PublicKey """The pubkey of the linked oracle queue authority.""" queue_authority: PublicKey """CrankAccount data""" crank: Any """QueueAccount data""" queue: Any """Token mint pubkey""" token_mint: PublicKey """ Array of pubkeys to attempt to pop. If discluded, this will be loaded from the crank upon calling. """ ready_pubkeys: list[PublicKey] = None """Nonce to allow consecutive crank pops with the same blockhash.""" nonce: int = None fail_open_on_mismatch: bool = None
Specifies the wallet to reward for turning the crank.
The pubkey of the linked oracle queue.
The pubkey of the linked oracle queue authority.
QueueAccount data
Token mint pubkey
Array of pubkeys to attempt to pop. If discluded, this will be loaded from the crank upon calling.
Nonce to allow consecutive crank pops with the same blockhash.
View Source
@dataclass class CrankInitParams: """OracleQueueAccount for which this crank is associated""" queue_account: OracleQueueAccount """Buffer specifying crank name""" name: bytes = None """Buffer specifying crank metadata""" metadata: bytes = None """Optional max number of rows""" max_rows: int = None
OracleQueueAccount for which this crank is associated
Buffer specifying crank name
Buffer specifying crank metadata
Optional max number of rows
View Source
@dataclass class CrankPushParams: aggregator_account: AggregatorAccount
CrankPushParams(aggregator_account: switchboardpy.AggregatorAccount)
View Source
@dataclass class CrankRow: """Aggregator account pubkey""" pubkey: PublicKey """Next aggregator update timestamp to order the crank by""" next_timestamp: int @staticmethod def from_bytes(buf: bytes): pass
Aggregator account pubkey
Next aggregator update timestamp to order the crank by
View Source
@staticmethod def from_bytes(buf: bytes): pass
View Source
class JobAccount: """ A Switchboard account representing a job for an oracle to perform, stored as a protocol buffer. Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This aggregator's public key keypair (Keypair | None): this aggregator's keypair """ def __init__(self, params: AccountParams): if params.public_key is None and params.keypair is None: raise ValueError('User must provide either a publicKey or keypair for account use.') if params.keypair and params.public_key and params.keypair.public_key != params.public_key: raise ValueError('User must provide either a publicKey or keypair for account use.') self.program = params.program self.public_key = params.keypair.public_key if params.keypair else params.public_key self.keypair = params.keypair """ Load and parse JobAccount state based on the program IDL. Returns: name (JobAccount): data parsed in accordance with the Switchboard IDL. Args: Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def load_data(self): return await JobAccountData.fetch(self.program.provider.connection, self.public_key) """ Load and parse the protobuf from the raw buffer stored in the JobAccount. Returns: OracleJob Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def load_job(self): job = await self.load_data() return parseOracleJob(job.data); """ Load and parse JobAccount data based on the program IDL from a buffer. Args: program (anchorpy.Program) buf (bytes): Bytes representation of the JobAccount Returns: Any: JobAccountData parsed in accordance with the Switchboard IDL. """ @staticmethod def decode(program: anchorpy.Program, buf: bytes): coder = anchorpy.Coder(program.idl) return coder.accounts.decode(buf) """ Create and initialize the JobAccount Args: program (anchor.Program) params (JobInitParams) Returns: JobAccount """ @staticmethod async def create(program: anchorpy.Program, params: JobInitParams): job_account = params.keypair or Keypair.generate() size = 280 + len(params.data) + (''.join(params.variables) if params.variables else 0) state_account, state_bump = ProgramStateAccount.from_seed(program) state = await state_account.load_data() response = await program.provider.connection.get_minimum_balance_for_rent_exemption(size) lamports = response["result"] await program.rpc["job_init"]( { "name": params.name or bytes([0] * 32), "expiration": params.expiration or 0, "data": params.data, "variables": [bytes(b'') for _ in params.variables] if params.variables else [], "state_bump": state_bump }, ctx=anchorpy.Context( accounts={ "job": job_account.public_key, "authority": params.authority or state.token_vault, "program_state": state_account.public_key }, signers=[job_account], pre_instructions=[ create_account( CreateAccountParams( from_pubkey=program.provider.wallet.public_key, new_account_pubkey=job_account.public_key, lamports=lamports, space=size, program_id=program.program_id ) ) ] ) ) return JobAccount(AccountParams(program=program, keypair=job_account))
A Switchboard account representing a job for an oracle to perform, stored as a protocol buffer.
Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This aggregator's public key keypair (Keypair | None): this aggregator's keypair
View Source
def __init__(self, params: AccountParams): if params.public_key is None and params.keypair is None: raise ValueError('User must provide either a publicKey or keypair for account use.') if params.keypair and params.public_key and params.keypair.public_key != params.public_key: raise ValueError('User must provide either a publicKey or keypair for account use.') self.program = params.program self.public_key = params.keypair.public_key if params.keypair else params.public_key self.keypair = params.keypair
Load and parse JobAccount state based on the program IDL.
Returns: name (JobAccount): data parsed in accordance with the Switchboard IDL.
Args:
Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
View Source
async def load_data(self): return await JobAccountData.fetch(self.program.provider.connection, self.public_key)
View Source
async def load_job(self): job = await self.load_data() return parseOracleJob(job.data);
View Source
@staticmethod def decode(program: anchorpy.Program, buf: bytes): coder = anchorpy.Coder(program.idl) return coder.accounts.decode(buf)
View Source
@staticmethod async def create(program: anchorpy.Program, params: JobInitParams): job_account = params.keypair or Keypair.generate() size = 280 + len(params.data) + (''.join(params.variables) if params.variables else 0) state_account, state_bump = ProgramStateAccount.from_seed(program) state = await state_account.load_data() response = await program.provider.connection.get_minimum_balance_for_rent_exemption(size) lamports = response["result"] await program.rpc["job_init"]( { "name": params.name or bytes([0] * 32), "expiration": params.expiration or 0, "data": params.data, "variables": [bytes(b'') for _ in params.variables] if params.variables else [], "state_bump": state_bump }, ctx=anchorpy.Context( accounts={ "job": job_account.public_key, "authority": params.authority or state.token_vault, "program_state": state_account.public_key }, signers=[job_account], pre_instructions=[ create_account( CreateAccountParams( from_pubkey=program.provider.wallet.public_key, new_account_pubkey=job_account.public_key, lamports=lamports, space=size, program_id=program.program_id ) ) ] ) ) return JobAccount(AccountParams(program=program, keypair=job_account))
View Source
@dataclass class JobInitParams: """A serialized protocol buffer holding the schema of the job.""" data: bytes """An optional name to apply to the job account.""" name: bytes = None """unix_timestamp of when funds can be withdrawn from this account.""" expiration: int = None """A required variables oracles must fill to complete the job.""" variables: list[str] = None """A pre-generated keypair to use.""" keypair: Keypair = None """ An optional wallet for receiving kickbacks from job usage in feeds. """ authority: PublicKey = None
A serialized protocol buffer holding the schema of the job.
An optional name to apply to the job account.
unix_timestamp of when funds can be withdrawn from this account.
A required variables oracles must fill to complete the job.
A pre-generated keypair to use.
An optional wallet for receiving kickbacks from job usage in feeds.
View Source
class LeaseAccount: """ A Switchboard account representing a lease for managing funds for oracle payouts for fulfilling feed updates. Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This lease's public key keypair (Keypair | None): this lease's keypair """ def __init__(self, params: AccountParams): if params.public_key is None and params.keypair is None: raise ValueError('User must provide either a publicKey or keypair for account use.') if params.keypair and params.public_key and params.keypair.public_key != params.public_key: raise ValueError('User must provide either a publicKey or keypair for account use.') self.program = params.program self.public_key = params.keypair.public_key if params.keypair else params.public_key self.keypair = params.keypair """ Get the size of an LeaseAccount on chain Args: Returns: int: size of the LeaseAccount type on chain """ def size(self): return self.program.account["LeaseAccountData"].size """ Load and parse LeaseAccount data based on the program IDL Args: Returns: LeaseAccount Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def load_data(self): return await LeaseAccountData.fetch(self.program.provider.connection, self.public_key) """ Loads a LeaseAccount from the expected PDA seed format Args: program (anchorpy.Program) queue_account (OracleQueueAccount) aggregator_account (AggregatorAccount) Returns: Tuple[LeaseAccount, int]: LeaseAccount and PDA bump """ @staticmethod def from_seed(program: anchorpy.Program, queue_account: OracleQueueAccount, aggregator_account: AggregatorAccount): pubkey, bump = publickey.PublicKey.find_program_address( [ bytes(b'LeaseAccountData'), bytes(queue_account.public_key), bytes(aggregator_account.public_key), ], program.program_id ) return LeaseAccount(AccountParams(program=program, public_key=pubkey)), bump """ Create and initialize the LeaseAccount. Args: program (anchor.Program): Switchboard program representation holding connection and IDL. params (LeaseInitParams) Returns: LeaseAccount """ @staticmethod async def create(program: anchorpy.Program, params: LeaseInitParams): program_state_account, state_bump = ProgramStateAccount.from_seed(program) switch_token_mint = await program_state_account.get_token_mint() lease_account, lease_bump = LeaseAccount.from_seed( program, params.oracle_queue_account, params.aggregator_account ) job_account_data = await params.aggregator_account.load_jobs() aggregator_account_data = await params.aggregator_account.load_data() job_pubkeys: list[PublicKey] = aggregator_account_data.job_pubkeys_data[:aggregator_account_data.job_pubkeys_size] job_wallets: list[PublicKey] = [] wallet_bumps: list[int] = [] for job in job_account_data: authority = job.account.authority or PublicKey('11111111111111111111111111111111') pubkey, bump = publickey.PublicKey.find_program_address( [ bytes(authority), bytes(TOKEN_PROGRAM_ID), bytes(switch_token_mint.pubkey), ], ASSOCIATED_TOKEN_PROGRAM_ID ) job_wallets.append(pubkey) wallet_bumps.append(bump) escrow = await switch_token_mint.create_associated_token_account(lease_account.public_key, skip_confirmation=False) await program.rpc["lease_init"]( { "load_amount": params.load_amount, "state_bump": state_bump, "lease_bump": lease_bump, "withdraw_authority": params.withdraw_authority or PublicKey('11111111111111111111111111111111'), "wallet_bumps": bytes(wallet_bumps) }, ctx=anchorpy.Context( accounts={ "program_state": program_state_account.public_key, "lease": lease_account.public_key, "queue": params.oracle_queue_account.public_key, "aggregator": params.aggregator_account.public_key, "system_program": system_program.SYS_PROGRAM_ID, "funder": params.funder, "payer": program.provider.wallet.public_key, "token_program": TOKEN_PROGRAM_ID, "escrow": escrow, "owner": params.funder_authority.public_key, "mint": switch_token_mint.pubkey }, signers=[params.funder_authority], remaining_accounts=[AccountMeta(is_signer=False, is_writable=True, pubkey=x) for x in [*job_pubkeys, *job_wallets]] ) ) return LeaseAccount(AccountParams(program=program, public_key=lease_account.public_key)) """ Get lease balance Args: Returns: int balance """ async def get_balance(self): lease = self.load_data() return await self.program.provider.connection.get_balance(lease.escrow) """ Adds fund to a LeaseAccount. Note that funds can always be withdrawn by the withdraw authority if one was set on lease initialization. Args: program (anchor.Program): Switchboard program representation holding connection and IDL. params (LeaseExtendParams) Returns: TransactionSignature """ async def extend(self, params: LeaseExtendParams): program = self.program lease = await self.load_data() escrow = lease.escrow queue = lease.queue aggregator = lease.aggregator program_state_account, state_bump = ProgramStateAccount.from_seed(program) queue_account = OracleQueueAccount(AccountParams(program=program, public_key=queue)) switch_token_mint = await queue_account.load_mint() lease_account, lease_bump = LeaseAccount.from_seed( program, OracleQueueAccount(AccountParams(program=program, public_key=queue)), AggregatorAccount(AccountParams(program=program, public_key=aggregator)) ) job_account_data = await aggregator.load_jobs() aggregator_account_data = await aggregator.load_data() job_pubkeys: list[PublicKey] = aggregator_account_data.job_pubkeys_data[:aggregator_account_data.job_pubkeys_size] job_wallets: list[PublicKey] = [] wallet_bumps: list[int] = [] for job in job_account_data: authority = job.account.authority or PublicKey('11111111111111111111111111111111') pubkey, bump = publickey.PublicKey.find_program_address( [ bytes(authority), bytes(TOKEN_PROGRAM_ID), bytes(switch_token_mint.pubkey), ], ASSOCIATED_TOKEN_PROGRAM_ID ) job_wallets.append(pubkey) wallet_bumps.append(bump) return await program.rpc["lease_extend"]( { "load_amount": params.load_amount, "state_bump": state_bump, "lease_bump": lease_bump, "wallet_bumps": bytes(wallet_bumps) }, ctx=anchorpy.Context( accounts={ "lease": lease_account.public_key, "aggregator": aggregator, "queue": queue, "funder": params.funder, "owner": params.funder_authority.public_key, "token_program": TOKEN_PROGRAM_ID, "escrow": escrow, "program_state": program_state_account.public_key, "mint": switch_token_mint.pubkey }, signers=[params.funder_authority], remaining_accounts=[AccountMeta(is_signer=False, is_writable=True, pubkey=x) for x in [*job_pubkeys, *job_wallets]] ) ) """ Withdraw stake and/or rewards from a LeaseAccount. Args: params (LeaseWithdrawParams) Returns: TransactionSignature Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def withdraw(self, params: LeaseWithdrawParams): program = self.program lease = await self.load_data() escrow = lease.escrow queue = lease.queue aggregator = lease.aggregator program_state_account, state_bump = ProgramStateAccount.from_seed(program) queue_account = OracleQueueAccount(AccountParams(program=program, public_key=queue)) switch_token_mint = await queue_account.load_mint() lease_account, lease_bump = LeaseAccount.from_seed( program, OracleQueueAccount(AccountParams(program=program, public_key=queue)), AggregatorAccount(AccountParams(program=program, public_key=aggregator)) ) return await self.program.rpc["lease_withdraw"]( { "amount": params.amount, "state_bump": state_bump, "lease_bump": lease_bump }, ctx=anchorpy.Context( accounts={ "lease": lease_account.public_key, "escrow": escrow, "aggregator": aggregator, "queue": queue, "withdraw_authority": params.withdraw_authority.public_key, "withdraw_account": params.withdraw_wallet, "token_program": TOKEN_PROGRAM_ID, "program_state": program_state_account.public_key, "mint": switch_token_mint.pubkey }, signers=[params.withdraw_authority] ) )
A Switchboard account representing a lease for managing funds for oracle payouts for fulfilling feed updates.
Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This lease's public key keypair (Keypair | None): this lease's keypair
View Source
def __init__(self, params: AccountParams): if params.public_key is None and params.keypair is None: raise ValueError('User must provide either a publicKey or keypair for account use.') if params.keypair and params.public_key and params.keypair.public_key != params.public_key: raise ValueError('User must provide either a publicKey or keypair for account use.') self.program = params.program self.public_key = params.keypair.public_key if params.keypair else params.public_key self.keypair = params.keypair
Get the size of an LeaseAccount on chain
Args:
Returns: int: size of the LeaseAccount type on chain
View Source
def size(self): return self.program.account["LeaseAccountData"].size
View Source
async def load_data(self): return await LeaseAccountData.fetch(self.program.provider.connection, self.public_key)
View Source
@staticmethod def from_seed(program: anchorpy.Program, queue_account: OracleQueueAccount, aggregator_account: AggregatorAccount): pubkey, bump = publickey.PublicKey.find_program_address( [ bytes(b'LeaseAccountData'), bytes(queue_account.public_key), bytes(aggregator_account.public_key), ], program.program_id ) return LeaseAccount(AccountParams(program=program, public_key=pubkey)), bump
View Source
@staticmethod async def create(program: anchorpy.Program, params: LeaseInitParams): program_state_account, state_bump = ProgramStateAccount.from_seed(program) switch_token_mint = await program_state_account.get_token_mint() lease_account, lease_bump = LeaseAccount.from_seed( program, params.oracle_queue_account, params.aggregator_account ) job_account_data = await params.aggregator_account.load_jobs() aggregator_account_data = await params.aggregator_account.load_data() job_pubkeys: list[PublicKey] = aggregator_account_data.job_pubkeys_data[:aggregator_account_data.job_pubkeys_size] job_wallets: list[PublicKey] = [] wallet_bumps: list[int] = [] for job in job_account_data: authority = job.account.authority or PublicKey('11111111111111111111111111111111') pubkey, bump = publickey.PublicKey.find_program_address( [ bytes(authority), bytes(TOKEN_PROGRAM_ID), bytes(switch_token_mint.pubkey), ], ASSOCIATED_TOKEN_PROGRAM_ID ) job_wallets.append(pubkey) wallet_bumps.append(bump) escrow = await switch_token_mint.create_associated_token_account(lease_account.public_key, skip_confirmation=False) await program.rpc["lease_init"]( { "load_amount": params.load_amount, "state_bump": state_bump, "lease_bump": lease_bump, "withdraw_authority": params.withdraw_authority or PublicKey('11111111111111111111111111111111'), "wallet_bumps": bytes(wallet_bumps) }, ctx=anchorpy.Context( accounts={ "program_state": program_state_account.public_key, "lease": lease_account.public_key, "queue": params.oracle_queue_account.public_key, "aggregator": params.aggregator_account.public_key, "system_program": system_program.SYS_PROGRAM_ID, "funder": params.funder, "payer": program.provider.wallet.public_key, "token_program": TOKEN_PROGRAM_ID, "escrow": escrow, "owner": params.funder_authority.public_key, "mint": switch_token_mint.pubkey }, signers=[params.funder_authority], remaining_accounts=[AccountMeta(is_signer=False, is_writable=True, pubkey=x) for x in [*job_pubkeys, *job_wallets]] ) ) return LeaseAccount(AccountParams(program=program, public_key=lease_account.public_key))
View Source
async def get_balance(self): lease = self.load_data() return await self.program.provider.connection.get_balance(lease.escrow)
View Source
async def extend(self, params: LeaseExtendParams): program = self.program lease = await self.load_data() escrow = lease.escrow queue = lease.queue aggregator = lease.aggregator program_state_account, state_bump = ProgramStateAccount.from_seed(program) queue_account = OracleQueueAccount(AccountParams(program=program, public_key=queue)) switch_token_mint = await queue_account.load_mint() lease_account, lease_bump = LeaseAccount.from_seed( program, OracleQueueAccount(AccountParams(program=program, public_key=queue)), AggregatorAccount(AccountParams(program=program, public_key=aggregator)) ) job_account_data = await aggregator.load_jobs() aggregator_account_data = await aggregator.load_data() job_pubkeys: list[PublicKey] = aggregator_account_data.job_pubkeys_data[:aggregator_account_data.job_pubkeys_size] job_wallets: list[PublicKey] = [] wallet_bumps: list[int] = [] for job in job_account_data: authority = job.account.authority or PublicKey('11111111111111111111111111111111') pubkey, bump = publickey.PublicKey.find_program_address( [ bytes(authority), bytes(TOKEN_PROGRAM_ID), bytes(switch_token_mint.pubkey), ], ASSOCIATED_TOKEN_PROGRAM_ID ) job_wallets.append(pubkey) wallet_bumps.append(bump) return await program.rpc["lease_extend"]( { "load_amount": params.load_amount, "state_bump": state_bump, "lease_bump": lease_bump, "wallet_bumps": bytes(wallet_bumps) }, ctx=anchorpy.Context( accounts={ "lease": lease_account.public_key, "aggregator": aggregator, "queue": queue, "funder": params.funder, "owner": params.funder_authority.public_key, "token_program": TOKEN_PROGRAM_ID, "escrow": escrow, "program_state": program_state_account.public_key, "mint": switch_token_mint.pubkey }, signers=[params.funder_authority], remaining_accounts=[AccountMeta(is_signer=False, is_writable=True, pubkey=x) for x in [*job_pubkeys, *job_wallets]] ) )
View Source
async def withdraw(self, params: LeaseWithdrawParams): program = self.program lease = await self.load_data() escrow = lease.escrow queue = lease.queue aggregator = lease.aggregator program_state_account, state_bump = ProgramStateAccount.from_seed(program) queue_account = OracleQueueAccount(AccountParams(program=program, public_key=queue)) switch_token_mint = await queue_account.load_mint() lease_account, lease_bump = LeaseAccount.from_seed( program, OracleQueueAccount(AccountParams(program=program, public_key=queue)), AggregatorAccount(AccountParams(program=program, public_key=aggregator)) ) return await self.program.rpc["lease_withdraw"]( { "amount": params.amount, "state_bump": state_bump, "lease_bump": lease_bump }, ctx=anchorpy.Context( accounts={ "lease": lease_account.public_key, "escrow": escrow, "aggregator": aggregator, "queue": queue, "withdraw_authority": params.withdraw_authority.public_key, "withdraw_account": params.withdraw_wallet, "token_program": TOKEN_PROGRAM_ID, "program_state": program_state_account.public_key, "mint": switch_token_mint.pubkey }, signers=[params.withdraw_authority] ) )
View Source
@dataclass class LeaseExtendParams: """Token amount to load into the lease escrow""" load_amount: int """The funding wallet of the lease""" funder: PublicKey """The authority of the funding wallet""" funder_authority: Keypair
Token amount to load into the lease escrow
The funding wallet of the lease
The authority of the funding wallet
View Source
@dataclass class LeaseInitParams: """Token amount to load into the lease escrow""" load_amount: int """The funding wallet of the lease""" funder: PublicKey """The authority of the funding wallet""" funder_authority: Keypair """The target to which this lease is applied""" oracle_queue_account: OracleQueueAccount """The feed which the lease grants permission""" aggregator_account: AggregatorAccount """This authority will be permitted to withdraw funds from this lease""" withdraw_authority: PublicKey = None
Token amount to load into the lease escrow
The funding wallet of the lease
The authority of the funding wallet
The feed which the lease grants permission
This authority will be permitted to withdraw funds from this lease
View Source
@dataclass class LeaseWithdrawParams: """Token amount to withdraw from the lease escrow""" amount: int """The wallet of to withdraw to""" withdraw_wallet: PublicKey """The withdraw authority of the lease""" withdraw_authority: Keypair
Token amount to withdraw from the lease escrow
The wallet of to withdraw to
The withdraw authority of the lease
View Source
class OracleAccount: """ A Switchboard account representing an oracle account and its associated queue and escrow account. Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This aggregator's public key keypair (Keypair | None): this aggregator's keypair """ def __init__(self, params: AccountParams): if params.public_key is None and params.keypair is None: raise ValueError('User must provide either a publicKey or keypair for account use.') if params.keypair and params.public_key and params.keypair.public_key != params.public_key: raise ValueError('User must provide either a publicKey or keypair for account use.') self.program = params.program self.public_key = params.keypair.public_key if params.keypair else params.public_key self.keypair = params.keypair """ Get the size of an OracleAccount on chain Args: Returns: int: size of the OracleAccount type on chain """ def size(self): return self.program.account["OracleAccountData"].size """ Load and parse OracleAccount data based on the program IDL Args: Returns: OracleAccount Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def load_data(self): return await OracleAccountData.fetch(self.program.provider.connection, self.public_key) """ Loads a OracleAccount from the expected PDA seed format Args: program (anchorpy.Program) queue_account (OracleQueueAccount) wallet (PublicKey) Returns: Tuple[OracleAccount, int]: OracleAccount and PDA bump """ @staticmethod def from_seed(program: anchorpy.Program, queue_account: OracleQueueAccount, wallet: PublicKey): oracle_pubkey, bump = PublicKey.find_program_address( [ bytes(b'OracleAccountData'), bytes(queue_account.public_key), bytes(wallet), ], program.program_id ) return OracleAccount(AccountParams(program=program, public_key=oracle_pubkey)), bump """ Create and initialize the OracleAccount. Args: program (anchor.Program): Switchboard program representation holding connection and IDL. params (OracleInitParams) Returns: OracleAccount """ @staticmethod async def create(program: anchorpy.Program, params: OracleInitParams): payer_keypair = Keypair.from_secret_key(program.provider.wallet.payer.secret_key) program_state_account, state_bump = ProgramStateAccount.from_seed(program) switch_token_mint = await program_state_account.get_token_mint() wallet = await switch_token_mint.create_account(program.provider.wallet.public_key) await switch_token_mint.set_authority( wallet, program_state_account.public_key, 'AccountOwner', payer_keypair, [] ) oracle_account, oracle_bump = OracleAccount.from_seed( program, params.queue_account, wallet ) await program.rpc["oracle_init"]( { "name": params.name or bytes([0] * 32), "metadata": params.metadata or bytes([0] * 128), "state_bump": state_bump, "oracle_bump": oracle_bump, }, ctx=anchorpy.Context( accounts={ "oracle": oracle_account.public_key, "oracle_authority": payer_keypair.public_key, "queue": params.queue_account.public_key, "wallet": wallet, "program_state": program_state_account.public_key, "system_program": system_program.SYS_PROGRAM_ID, "payer": program.provider.wallet.public_key } ) ) return OracleAccount(AccountParams(program=program, public_key=oracle_account.public_key)) """ Inititates a heartbeat for an OracleAccount, signifying oracle is still healthy. Args: Returns: TransactionSignature Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def heartbeat(self): payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key) oracle = await self.load_data() queue_account = OracleQueueAccount(AccountParams(program=self.program,public_key=oracle.queue_pubkey)) queue_data = await queue_account.load_data() last_pubkey = self.public_key if queue_data.size != 0: last_pubkey = queue_data.queue[queue_data.gc_idx] permission_account, permission_bump = PermissionAccount.from_seed( self.program, queue_data.authority, queue_account.public_key, self.public_key ) try: await permission_account.load_data() except Exception: raise ValueError('A requested permission pda account has not been initialized.') return await self.program.rpc["oracle_heartbeat"]( { "permission_bump": permission_bump }, ctx=anchorpy.Context( accounts={ "oracle": self.public_key, "oracle_authority": payer_keypair.public_key, "token_account": oracle.token_account, "gc_oracle": last_pubkey, "oracle_queue": queue_account.public_key, "permission": permission_account.public_key, "data_buffer": queue_data.data_buffer }, signers=[self.keypair] ) ) """ Withdraw stake and/or rewards from an OracleAccount. Args: params (OracleWithdrawParams) Returns: TransactionSignature Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def withdraw(self, params: OracleWithdrawParams): payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key) oracle = await self.load_data() queue_pubkey = oracle.queue_pubkey queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=queue_pubkey)) queue = await queue_account.load_data() queue_authority = queue.authority state_account, state_bump = ProgramStateAccount.from_seed(self.program) permission_account, permission_bump = PermissionAccount.from_seed( self.program, queue_authority, queue_account.public_key, self.public_key ) return await self.program.rpc["oracle_withdraw"]( { "permission_bump": permission_bump, "state_bump": state_bump, "amount": params.amount }, ctx=anchorpy.Context( accounts={ "oracle": self.public_key, "oracle_authority": params.oracle_authority.public_key, "token_account": oracle.token_account, "withdraw_account": params.withdraw_account, "oracle_queue": queue_account.public_key, "permission": permission_account.public_key, "token_program": TOKEN_PROGRAM_ID, "program_state": state_account.public_key, "system_program": system_program.SYS_PROGRAM_ID, "payer": self.program.provider.wallet.public_key }, signers=[params.oracle_authority] ) )
A Switchboard account representing an oracle account and its associated queue and escrow account.
Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This aggregator's public key keypair (Keypair | None): this aggregator's keypair
View Source
def __init__(self, params: AccountParams): if params.public_key is None and params.keypair is None: raise ValueError('User must provide either a publicKey or keypair for account use.') if params.keypair and params.public_key and params.keypair.public_key != params.public_key: raise ValueError('User must provide either a publicKey or keypair for account use.') self.program = params.program self.public_key = params.keypair.public_key if params.keypair else params.public_key self.keypair = params.keypair
Get the size of an OracleAccount on chain
Args:
Returns: int: size of the OracleAccount type on chain
View Source
def size(self): return self.program.account["OracleAccountData"].size
View Source
async def load_data(self): return await OracleAccountData.fetch(self.program.provider.connection, self.public_key)
View Source
@staticmethod def from_seed(program: anchorpy.Program, queue_account: OracleQueueAccount, wallet: PublicKey): oracle_pubkey, bump = PublicKey.find_program_address( [ bytes(b'OracleAccountData'), bytes(queue_account.public_key), bytes(wallet), ], program.program_id ) return OracleAccount(AccountParams(program=program, public_key=oracle_pubkey)), bump
View Source
@staticmethod async def create(program: anchorpy.Program, params: OracleInitParams): payer_keypair = Keypair.from_secret_key(program.provider.wallet.payer.secret_key) program_state_account, state_bump = ProgramStateAccount.from_seed(program) switch_token_mint = await program_state_account.get_token_mint() wallet = await switch_token_mint.create_account(program.provider.wallet.public_key) await switch_token_mint.set_authority( wallet, program_state_account.public_key, 'AccountOwner', payer_keypair, [] ) oracle_account, oracle_bump = OracleAccount.from_seed( program, params.queue_account, wallet ) await program.rpc["oracle_init"]( { "name": params.name or bytes([0] * 32), "metadata": params.metadata or bytes([0] * 128), "state_bump": state_bump, "oracle_bump": oracle_bump, }, ctx=anchorpy.Context( accounts={ "oracle": oracle_account.public_key, "oracle_authority": payer_keypair.public_key, "queue": params.queue_account.public_key, "wallet": wallet, "program_state": program_state_account.public_key, "system_program": system_program.SYS_PROGRAM_ID, "payer": program.provider.wallet.public_key } ) ) return OracleAccount(AccountParams(program=program, public_key=oracle_account.public_key))
View Source
async def heartbeat(self): payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key) oracle = await self.load_data() queue_account = OracleQueueAccount(AccountParams(program=self.program,public_key=oracle.queue_pubkey)) queue_data = await queue_account.load_data() last_pubkey = self.public_key if queue_data.size != 0: last_pubkey = queue_data.queue[queue_data.gc_idx] permission_account, permission_bump = PermissionAccount.from_seed( self.program, queue_data.authority, queue_account.public_key, self.public_key ) try: await permission_account.load_data() except Exception: raise ValueError('A requested permission pda account has not been initialized.') return await self.program.rpc["oracle_heartbeat"]( { "permission_bump": permission_bump }, ctx=anchorpy.Context( accounts={ "oracle": self.public_key, "oracle_authority": payer_keypair.public_key, "token_account": oracle.token_account, "gc_oracle": last_pubkey, "oracle_queue": queue_account.public_key, "permission": permission_account.public_key, "data_buffer": queue_data.data_buffer }, signers=[self.keypair] ) )
View Source
async def withdraw(self, params: OracleWithdrawParams): payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key) oracle = await self.load_data() queue_pubkey = oracle.queue_pubkey queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=queue_pubkey)) queue = await queue_account.load_data() queue_authority = queue.authority state_account, state_bump = ProgramStateAccount.from_seed(self.program) permission_account, permission_bump = PermissionAccount.from_seed( self.program, queue_authority, queue_account.public_key, self.public_key ) return await self.program.rpc["oracle_withdraw"]( { "permission_bump": permission_bump, "state_bump": state_bump, "amount": params.amount }, ctx=anchorpy.Context( accounts={ "oracle": self.public_key, "oracle_authority": params.oracle_authority.public_key, "token_account": oracle.token_account, "withdraw_account": params.withdraw_account, "oracle_queue": queue_account.public_key, "permission": permission_account.public_key, "token_program": TOKEN_PROGRAM_ID, "program_state": state_account.public_key, "system_program": system_program.SYS_PROGRAM_ID, "payer": self.program.provider.wallet.public_key }, signers=[params.oracle_authority] ) )
View Source
@dataclass class OracleInitParams: """Specifies the oracle queue to associate with this OracleAccount.""" queue_account: OracleQueueAccount """Buffer specifying orace name""" name: bytes = None """Buffer specifying oralce metadata""" metadata: bytes = None
Specifies the oracle queue to associate with this OracleAccount.
Buffer specifying orace name
Buffer specifying oralce metadata
View Source
@dataclass class OracleWithdrawParams: """Amount to withdraw""" amount: Decimal """Token Account to withdraw to""" withdraw_account: PublicKey """Oracle authority keypair""" oracle_authority: Keypair
Amount to withdraw
Token Account to withdraw to
Oracle authority keypair
View Source
class OracleQueueAccount: """A Switchboard account representing a queue for distributing oracles to permitted data feeds. Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This OracleQueueAccount's public key keypair (Keypair | None): this OracleQueueAccount's keypair """ def __init__(self, params: AccountParams): if params.public_key is None and params.keypair is None: raise ValueError('User must provide either a publicKey or keypair for account use.') if params.keypair and params.public_key and params.keypair.public_key != params.public_key: raise ValueError('User must provide either a publicKey or keypair for account use.') self.program = params.program self.public_key = params.keypair.public_key if params.keypair else params.public_key self.keypair = params.keypair """ Get the size of an OracleQueueAccount on chain Args: Returns: int: size of the OracleQueueAccount type on chain """ def size(self): return self.program.account["OracleQueueAccountData"].size """ Load and parse OracleQueueAccount data based on the program IDL Args: Returns: OracleQueueAccount Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def load_data(self): return await OracleQueueAccountData.fetch(self.program.provider.connection, self.public_key) """ Fetch the token mint for this queue Args: Returns: AsyncToken """ async def load_mint(self) -> AsyncToken: payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key) queue = await self.load_data() try: mint = AsyncToken(self.program.provider.connection, queue.mint, TOKEN_PROGRAM_ID, payer_keypair) return mint; except AttributeError: return AsyncToken(self.program.provider.connection, WRAPPED_SOL_MINT, TOKEN_PROGRAM_ID, payer_keypair) """ Create and initialize the OracleQueueAccount Args: program (anchor.Program) params (OracleQueueInitParams) Returns: OracleQueueAccount """ @staticmethod async def create(program: anchorpy.Program, params: OracleQueueInitParams): oracle_queue_account = Keypair.generate() buffer = Keypair.generate() queue_size = params.queue_size or 500 queue_size = queue_size * 32 + 8 response = await program.provider.connection.get_minimum_balance_for_rent_exemption(queue_size) lamports = response["result"] await program.rpc["oracle_queue_init"]( { "name": params.name or bytes([0] * 32), "metadata": params.metadata or bytes([0] * 64), "reward": params.reward or 0, "min_stake": params.min_stake or 0, "feed_probation_period": params.feed_probation_period or 0, "oracle_timeout": params.oracle_timeout or 180, "slashing_enabled": params.slashing_enabled or False, "variance_tolerance_multiplier": SwitchboardDecimal.from_decimal(params.variance_tolerance_multiplier or Decimal(2)).as_proper_sbd(program), "authority": params.authority, "consecutive_feed_failure_limit": params.consecutive_feed_failure_limit or 1000, "consecutive_oracle_failure_limit": params.consecutive_oracle_failure_limit or 1000, "minimum_delay_seconds": params.minimum_delay_seconds or 5, "queue_size": params.queue_size or 0, "unpermissioned_feeds": params.unpermissioned_feeds or False, "unpermissioned_vrf": params.unpermissioned_feeds or False, "enable_buffer_relayers": False }, ctx=anchorpy.Context( accounts={ "oracle_queue": oracle_queue_account.public_key, "authority": params.authority, "buffer": buffer.public_key, "system_program": system_program.SYS_PROGRAM_ID, "payer": program.provider.wallet.public_key, "mint": params.mint }, signers=[oracle_queue_account, buffer], pre_instructions=[ create_account( CreateAccountParams( from_pubkey=program.provider.wallet.public_key, new_account_pubkey=buffer.public_key, lamports=lamports, space=queue_size, program_id=program.program_id ) ) ] ) ) return OracleQueueAccount(AccountParams(program=program, keypair=oracle_queue_account));
A Switchboard account representing a queue for distributing oracles to permitted data feeds.
Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This OracleQueueAccount's public key keypair (Keypair | None): this OracleQueueAccount's keypair
View Source
def __init__(self, params: AccountParams): if params.public_key is None and params.keypair is None: raise ValueError('User must provide either a publicKey or keypair for account use.') if params.keypair and params.public_key and params.keypair.public_key != params.public_key: raise ValueError('User must provide either a publicKey or keypair for account use.') self.program = params.program self.public_key = params.keypair.public_key if params.keypair else params.public_key self.keypair = params.keypair
Get the size of an OracleQueueAccount on chain
Args:
Returns: int: size of the OracleQueueAccount type on chain
View Source
def size(self): return self.program.account["OracleQueueAccountData"].size
View Source
async def load_data(self): return await OracleQueueAccountData.fetch(self.program.provider.connection, self.public_key)
View Source
async def load_mint(self) -> AsyncToken: payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key) queue = await self.load_data() try: mint = AsyncToken(self.program.provider.connection, queue.mint, TOKEN_PROGRAM_ID, payer_keypair) return mint; except AttributeError: return AsyncToken(self.program.provider.connection, WRAPPED_SOL_MINT, TOKEN_PROGRAM_ID, payer_keypair)
View Source
@staticmethod async def create(program: anchorpy.Program, params: OracleQueueInitParams): oracle_queue_account = Keypair.generate() buffer = Keypair.generate() queue_size = params.queue_size or 500 queue_size = queue_size * 32 + 8 response = await program.provider.connection.get_minimum_balance_for_rent_exemption(queue_size) lamports = response["result"] await program.rpc["oracle_queue_init"]( { "name": params.name or bytes([0] * 32), "metadata": params.metadata or bytes([0] * 64), "reward": params.reward or 0, "min_stake": params.min_stake or 0, "feed_probation_period": params.feed_probation_period or 0, "oracle_timeout": params.oracle_timeout or 180, "slashing_enabled": params.slashing_enabled or False, "variance_tolerance_multiplier": SwitchboardDecimal.from_decimal(params.variance_tolerance_multiplier or Decimal(2)).as_proper_sbd(program), "authority": params.authority, "consecutive_feed_failure_limit": params.consecutive_feed_failure_limit or 1000, "consecutive_oracle_failure_limit": params.consecutive_oracle_failure_limit or 1000, "minimum_delay_seconds": params.minimum_delay_seconds or 5, "queue_size": params.queue_size or 0, "unpermissioned_feeds": params.unpermissioned_feeds or False, "unpermissioned_vrf": params.unpermissioned_feeds or False, "enable_buffer_relayers": False }, ctx=anchorpy.Context( accounts={ "oracle_queue": oracle_queue_account.public_key, "authority": params.authority, "buffer": buffer.public_key, "system_program": system_program.SYS_PROGRAM_ID, "payer": program.provider.wallet.public_key, "mint": params.mint }, signers=[oracle_queue_account, buffer], pre_instructions=[ create_account( CreateAccountParams( from_pubkey=program.provider.wallet.public_key, new_account_pubkey=buffer.public_key, lamports=lamports, space=queue_size, program_id=program.program_id ) ) ] ) ) return OracleQueueAccount(AccountParams(program=program, keypair=oracle_queue_account));
View Source
@dataclass class OracleQueueInitParams: """Mint for the oracle queue""" mint: PublicKey """Rewards to provide oracles and round openers on this queue.""" reward: int """The minimum amount of stake oracles must present to remain on the queue.""" min_stake: int """ The account to delegate authority to for creating permissions targeted at the queue. """ authority: PublicKey """Time period we should remove an oracle after if no response.""" oracle_timeout: int = None """ The tolerated variance amount oracle results can have from the accepted round result before being slashed. slashBound = varianceToleranceMultiplier * stdDeviation Default: 2 """ variance_tolerance_multiplier: Decimal = None """Consecutive failure limit for a feed before feed permission is revoked.""" consecutive_feed_failure_limit: int = None """ Consecutive failure limit for an oracle before oracle permission is revoked. """ consecutive_oracle_failure_limit: int = None """the minimum update delay time for Aggregators""" minimum_delay_seconds: int = None """Optionally set the size of the queue.""" queue_size: int = None """ Enabling this setting means data feeds do not need explicit permission to join the queue. """ unpermissioned_feeds: bool = None """Whether slashing is enabled on this queue""" slashing_enabled: bool = None """ After a feed lease is funded or re-funded, it must consecutively succeed N amount of times or its authorization to use the queue is auto-revoked. """ feed_probation_period: int = None """A name to assign to this OracleQueue.""" name: bytes = None """Buffer for queue metadata.""" metadata: bytes = None """ Enabling this setting means data feeds do not need explicit permission to request VRF proofs and verifications from this queue. """ unpermissioned_vrf: bool = None
Mint for the oracle queue
Rewards to provide oracles and round openers on this queue.
The minimum amount of stake oracles must present to remain on the queue.
The account to delegate authority to for creating permissions targeted at the queue.
The tolerated variance amount oracle results can have from the accepted round result before being slashed. slashBound = varianceToleranceMultiplier * stdDeviation Default: 2
Consecutive failure limit for a feed before feed permission is revoked.
Consecutive failure limit for an oracle before oracle permission is revoked.
the minimum update delay time for Aggregators
Optionally set the size of the queue.
Enabling this setting means data feeds do not need explicit permission to join the queue.
Whether slashing is enabled on this queue
After a feed lease is funded or re-funded, it must consecutively succeed N amount of times or its authorization to use the queue is auto-revoked.
A name to assign to this OracleQueue.
Buffer for queue metadata.
Enabling this setting means data feeds do not need explicit permission to request VRF proofs and verifications from this queue.
A ProtocolMessage
Field OracleJob.tasks
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.HttpTask
- DESCRIPTOR
- Header
- url
- method
- headers
- body
- Method
- METHOD_UNKOWN
- METHOD_GET
- METHOD_POST
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.JsonParseTask
- DESCRIPTOR
- path
- aggregation_method
- AggregationMethod
- NONE
- MIN
- MAX
- SUM
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.MedianTask
- DESCRIPTOR
- tasks
- jobs
- min_successful_required
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.MeanTask
- DESCRIPTOR
- tasks
- jobs
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.MaxTask
- DESCRIPTOR
- tasks
- jobs
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.ValueTask
- DESCRIPTOR
- value
- aggregator_pubkey
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.WebsocketTask
- DESCRIPTOR
- url
- subscription
- max_data_age_seconds
- filter
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.ConditionalTask
- DESCRIPTOR
- attempt
- on_failure
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.DivideTask
- DESCRIPTOR
- scalar
- aggregator_pubkey
- job
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.MultiplyTask
- DESCRIPTOR
- scalar
- aggregator_pubkey
- job
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.AddTask
- DESCRIPTOR
- scalar
- aggregator_pubkey
- job
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.SubtractTask
- DESCRIPTOR
- scalar
- aggregator_pubkey
- job
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.LpTokenPriceTask
- DESCRIPTOR
- mercurial_pool_address
- saber_pool_address
- orca_pool_address
- raydium_pool_address
- price_feed_addresses
- price_feed_jobs
- use_fair_price
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.LpExchangeRateTask
- DESCRIPTOR
- in_token_address
- out_token_address
- mercurial_pool_address
- saber_pool_address
- orca_pool_token_mint_address
- raydium_pool_address
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.RegexExtractTask
- DESCRIPTOR
- pattern
- group_number
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.XStepPriceTask
- DESCRIPTOR
- step_job
- step_aggregator_pubkey
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.TwapTask
- DESCRIPTOR
- aggregator_pubkey
- period
- weight_by_propagation_time
- min_samples
- ending_unix_timestamp
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.SerumSwapTask
- DESCRIPTOR
- serum_pool_address
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.PowTask
- DESCRIPTOR
- scalar
- aggregator_pubkey
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.LendingRateTask
- DESCRIPTOR
- protocol
- asset_mint
- field
- Field
- FIELD_DEPOSIT_RATE
- FIELD_BORROW_RATE
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.MangoPerpMarketTask
- DESCRIPTOR
- perp_market_address
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.JupiterSwapTask
- DESCRIPTOR
- in_token_address
- out_token_address
- base_amount
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.PerpMarketTask
- DESCRIPTOR
- mango_market_address
- drift_market_address
- zeta_market_address
- zo_market_address
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.OracleTask
- DESCRIPTOR
- switchboard_address
- pyth_address
- chainlink_address
- pyth_allowed_confidence_interval
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.AnchorFetchTask
- DESCRIPTOR
- program_id
- account_address
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.DefiKingdomsTask
- DESCRIPTOR
- Token
- provider
- in_token
- out_token
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.TpsTask
- DESCRIPTOR
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.SplStakePoolTask
- DESCRIPTOR
- pubkey
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.SplTokenParseTask
- DESCRIPTOR
- token_account_address
- mint_address
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.UniswapExchangeRateTask
- DESCRIPTOR
- in_token_address
- out_token_address
- in_token_amount
- slippage
- provider
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.SushiswapExchangeRateTask
- DESCRIPTOR
- in_token_address
- out_token_address
- in_token_amount
- slippage
- provider
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.PancakeswapExchangeRateTask
- DESCRIPTOR
- in_token_address
- out_token_address
- in_token_amount
- slippage
- provider
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.CacheTask
- DESCRIPTOR
- name
- method
- Method
- METHOD_GET
- METHOD_SET
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.SysclockOffsetTask
- DESCRIPTOR
A ProtocolMessage
Inherited Members
- google.protobuf.pyext._message.CMessage
- CMessage
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- RegisterExtension
- FindInitializationErrors
- Extensions
- job_schemas_pb2.Task
- DESCRIPTOR
- http_task
- json_parse_task
- median_task
- mean_task
- websocket_task
- divide_task
- multiply_task
- lp_token_price_task
- lp_exchange_rate_task
- conditional_task
- value_task
- max_task
- regex_extract_task
- xstep_price_task
- add_task
- subtract_task
- twap_task
- serum_swap_task
- pow_task
- lending_rate_task
- mango_perp_market_task
- jupiter_swap_task
- perp_market_task
- oracle_task
- anchor_fetch_task
- defi_kingdoms_task
- tps_task
- spl_stake_pool_task
- spl_token_parse_task
- uniswap_exchange_rate_task
- sushiswap_exchange_rate_task
- pancakeswap_exchange_rate_task
- cache_task
- sysclock_offset_task
View Source
class PermissionAccount: """A Switchboard account representing a permission or privilege granted by one account signer to another account. Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This permission's public key keypair (Keypair | None): this permission's keypair """ def __init__(self, params: AccountParams): if params.public_key is None and params.keypair is None: raise ValueError('User must provide either a publicKey or keypair for account use.') if params.keypair and params.public_key and params.keypair.public_key != params.public_key: raise ValueError('User must provide either a publicKey or keypair for account use.') self.program = params.program self.public_key = params.keypair.public_key if params.keypair else params.public_key self.keypair = params.keypair """ Check if a specific permission is enabled on this permission account Args: permission (SwitchboardPermissionValue) Returns: bool: whether or not the permission is enabled """ async def is_permission_enabled(self, permission: SwitchboardPermissionValue): perm_data = await self.load_data() permissions = perm_data.permissions return (permissions & permission) != 0 """ Load and parse PermissionAccount data based on the program IDL Args: Returns: PermissionAccount Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def load_data(self): return await PermissionAccountData.fetch(self.program.provider.connection, self.public_key) """ Get the size of a PermissionAccount on chain Args: Returns: int: size of the PermissionAccount type on chain """ def size(self): return self.program.account["PermissionAccountData"].size """ Create and initialize a PermissionAccount Args: program (anchor.Program) prarams (PermissionInitParams) Returns: PermissionAccount """ @staticmethod async def create(program: anchorpy.Program, params: PermissionInitParams): permission_account, permission_bump = PermissionAccount.from_seed( program, params.authority, params.granter, params.grantee ) await program.rpc["permission_init"]( { "permission_bump": permission_bump }, ctx=anchorpy.Context( accounts={ "permission": permission_account.public_key, "authority": params.authority, "granter": params.granter, "grantee": params.grantee, "system_program": system_program.SYS_PROGRAM_ID, "payer": program.provider.wallet.public_key }, ) ) return permission_account """ Loads a PermissionAccount from the expected PDA seed format Args: program (anchorpy.Program) authority (public_key): The authority pubkey to be incorporated into the account seed. granter (public_key): The granter pubkey to be incorporated into the account seed. grantee (public_key): The grantee pubkey to be incorporated into the account seed. Returns: Tuple[PermissionAccount, int]: PermissionAccount and PDA bump """ @staticmethod def from_seed(program: anchorpy.Program, authority: PublicKey, granter: PublicKey, grantee: PublicKey): pubkey, bump = PublicKey.find_program_address( [ bytes(b'PermissionAccountData'), bytes(authority), bytes(granter), bytes(grantee) ], program.program_id ) return PermissionAccount(AccountParams(program=program, public_key=pubkey)), bump """ Sets the permission in the PermissionAccount Args: params (PermissionSetParams) Returns: TransactionSignature """ async def set(self, params: PermissionSetParams): self.program.rpc["permission_set"]( { "permission": self.program.type["SwitchboardPermission"][params.permission](), "authority": params.authority.public_key }, ctx=anchorpy.Context( accounts={ "permission": self.public_key, "authority": params.authority.public_key }, signers=[params.authority] ) )
A Switchboard account representing a permission or privilege granted by one account signer to another account.
Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This permission's public key keypair (Keypair | None): this permission's keypair
View Source
def __init__(self, params: AccountParams): if params.public_key is None and params.keypair is None: raise ValueError('User must provide either a publicKey or keypair for account use.') if params.keypair and params.public_key and params.keypair.public_key != params.public_key: raise ValueError('User must provide either a publicKey or keypair for account use.') self.program = params.program self.public_key = params.keypair.public_key if params.keypair else params.public_key self.keypair = params.keypair
Check if a specific permission is enabled on this permission account
Args: permission (SwitchboardPermissionValue)
Returns: bool: whether or not the permission is enabled
View Source
async def is_permission_enabled(self, permission: SwitchboardPermissionValue): perm_data = await self.load_data() permissions = perm_data.permissions return (permissions & permission) != 0
View Source
async def load_data(self): return await PermissionAccountData.fetch(self.program.provider.connection, self.public_key)
View Source
def size(self): return self.program.account["PermissionAccountData"].size
View Source
@staticmethod async def create(program: anchorpy.Program, params: PermissionInitParams): permission_account, permission_bump = PermissionAccount.from_seed( program, params.authority, params.granter, params.grantee ) await program.rpc["permission_init"]( { "permission_bump": permission_bump }, ctx=anchorpy.Context( accounts={ "permission": permission_account.public_key, "authority": params.authority, "granter": params.granter, "grantee": params.grantee, "system_program": system_program.SYS_PROGRAM_ID, "payer": program.provider.wallet.public_key }, ) ) return permission_account
View Source
@staticmethod def from_seed(program: anchorpy.Program, authority: PublicKey, granter: PublicKey, grantee: PublicKey): pubkey, bump = PublicKey.find_program_address( [ bytes(b'PermissionAccountData'), bytes(authority), bytes(granter), bytes(grantee) ], program.program_id ) return PermissionAccount(AccountParams(program=program, public_key=pubkey)), bump
View Source
async def set(self, params: PermissionSetParams): self.program.rpc["permission_set"]( { "permission": self.program.type["SwitchboardPermission"][params.permission](), "authority": params.authority.public_key }, ctx=anchorpy.Context( accounts={ "permission": self.public_key, "authority": params.authority.public_key }, signers=[params.authority] ) )
View Source
@dataclass class PermissionInitParams: """Pubkey of the account granting the permission""" granter: PublicKey """The receiving amount of a permission""" grantee: PublicKey """The authority that is allowed to set permissions for this account""" authority: PublicKey
Pubkey of the account granting the permission
The receiving amount of a permission
The authority that is allowed to set permissions for this account
View Source
@dataclass class PermissionSetParams: """The permission to set""" permission: SwitchboardPermission """The authority controlling this permission""" authority: Keypair """Specifies whether to enable or disable the permission""" enable: bool
The permission to set
The authority controlling this permission
View Source
class ProgramStateAccount: """Account type representing Switchboard global program state. Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This program's public key keypair (Keypair | None): this program's keypair """ def __init__(self, params: AccountParams): if params.public_key is None and params.keypair is None: raise ValueError('User must provide either a publicKey or keypair for account use.') if params.keypair and params.public_key and params.keypair.public_key != params.public_key: raise ValueError('User must provide either a publicKey or keypair for account use.') self.program = params.program self.public_key = params.keypair.public_key if params.keypair else params.public_key self.keypair = params.keypair """ Constructs ProgramStateAccount from the static seed from which it was generated. Args: program (anchorpy.Program): Anchor-loaded aggregator Returns: ProgramStateAccount and PDA bump tuple. """ @staticmethod def from_seed(program: anchorpy.Program): state_pubkey, state_bump = publickey.PublicKey.find_program_address(['STATE'.encode()], program.program_id) return ProgramStateAccount(AccountParams(program=program, public_key=state_pubkey)), state_bump """ Load and parse ProgramStateAccount state based on the program IDL. Args: Returns: name (Any): data parsed in accordance with the Switchboard IDL. Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL. """ async def load_data(self): return await SbState.fetch(self.program.provider.connection, self.public_key) """ Fetch the Switchboard token mint specified in the program state account. Args: Returns: anchorpy. """ async def get_token_mint(self) -> AsyncToken: payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key) state = await self.load_data() switch_token_mint = AsyncToken(self.program.provider.connection, state.token_mint, TOKEN_PROGRAM_ID, payer_keypair) return switch_token_mint """ Get the size of the global ProgramStateAccount on chain Returns: int: size of the ProgramStateAccount on chain """ def size(self): return self.program.account["SbState"].size """ Create and initialize the ProgramStateAccount Args: program (anchorpy.Program): anchor program params (ProgramInitParams): optionally pass in mint address Returns: ProgramStateAccount that was generated """ @staticmethod async def create(program: anchorpy.Program, params: ProgramInitParams): payer_keypair = Keypair.from_secret_key(program.provider.wallet.payer.secret_key) state_account, state_bump = ProgramStateAccount.from_seed(program) psa = ProgramStateAccount(AccountParams(program=program, public_key=state_account.public_key)) try: await psa.load_data() return psa except Exception: pass mint = None vault = None if params.mint == None: decimals = 9 mint, vault = await anchorpy.utils.token.create_mint_and_vault( program.provider, 100_000_000, payer_keypair.public_key, decimals ) else: mint = params.mint token = AsyncToken( program.provider.connection, mint, TOKEN_PROGRAM_ID, payer_keypair ) vault = await token.create_account(payer_keypair.public_key) await program.rpc["program_init"]( { "state_bump": state_bump }, ctx=anchorpy.Context( accounts={ "state": state_account.public_key, "authority": payer_keypair.public_key, "token_mint": mint, "vault": vault, "payer": payer_keypair.public_key, "system_program": system_program.SYS_PROGRAM_ID, "token_program": TOKEN_PROGRAM_ID }, ) ) """ Transfer N tokens from the program vault to a specified account. Args: to (PublicKey): The recipient of the vault tokens. authority (Keypair): The vault authority required to sign the transfer tx params (VaultTransferParams): Specifies the amount to transfer. Returns: TransactionSignature """ async def vault_transfer(self, to: PublicKey, authority: Keypair, params: VaultTransferParams): state_pubkey, state_bump = ProgramStateAccount.from_seed(self.program) state = await self.load_data() vault = state.token_vault await self.program.rpc["vault_transfer"]( { "state_bump": state_bump, "amount": params.amount # @FIXME - can't be a decimal, must have mantissa / scale }, ctx=anchorpy.Context( accounts={ "state": state_pubkey, "to": to, "vault": vault, "authority": authority.public_key, "token_program": TOKEN_PROGRAM_ID, }, signers=[authority] ) )
Account type representing Switchboard global program state.
Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This program's public key keypair (Keypair | None): this program's keypair
View Source
def __init__(self, params: AccountParams): if params.public_key is None and params.keypair is None: raise ValueError('User must provide either a publicKey or keypair for account use.') if params.keypair and params.public_key and params.keypair.public_key != params.public_key: raise ValueError('User must provide either a publicKey or keypair for account use.') self.program = params.program self.public_key = params.keypair.public_key if params.keypair else params.public_key self.keypair = params.keypair
Constructs ProgramStateAccount from the static seed from which it was generated.
Args: program (anchorpy.Program): Anchor-loaded aggregator
Returns: ProgramStateAccount and PDA bump tuple.
View Source
@staticmethod def from_seed(program: anchorpy.Program): state_pubkey, state_bump = publickey.PublicKey.find_program_address(['STATE'.encode()], program.program_id) return ProgramStateAccount(AccountParams(program=program, public_key=state_pubkey)), state_bump
View Source
async def load_data(self): return await SbState.fetch(self.program.provider.connection, self.public_key)
View Source
async def get_token_mint(self) -> AsyncToken: payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key) state = await self.load_data() switch_token_mint = AsyncToken(self.program.provider.connection, state.token_mint, TOKEN_PROGRAM_ID, payer_keypair) return switch_token_mint
View Source
def size(self): return self.program.account["SbState"].size
View Source
@staticmethod async def create(program: anchorpy.Program, params: ProgramInitParams): payer_keypair = Keypair.from_secret_key(program.provider.wallet.payer.secret_key) state_account, state_bump = ProgramStateAccount.from_seed(program) psa = ProgramStateAccount(AccountParams(program=program, public_key=state_account.public_key)) try: await psa.load_data() return psa except Exception: pass mint = None vault = None if params.mint == None: decimals = 9 mint, vault = await anchorpy.utils.token.create_mint_and_vault( program.provider, 100_000_000, payer_keypair.public_key, decimals ) else: mint = params.mint token = AsyncToken( program.provider.connection, mint, TOKEN_PROGRAM_ID, payer_keypair ) vault = await token.create_account(payer_keypair.public_key) await program.rpc["program_init"]( { "state_bump": state_bump }, ctx=anchorpy.Context( accounts={ "state": state_account.public_key, "authority": payer_keypair.public_key, "token_mint": mint, "vault": vault, "payer": payer_keypair.public_key, "system_program": system_program.SYS_PROGRAM_ID, "token_program": TOKEN_PROGRAM_ID }, ) )
View Source
async def vault_transfer(self, to: PublicKey, authority: Keypair, params: VaultTransferParams): state_pubkey, state_bump = ProgramStateAccount.from_seed(self.program) state = await self.load_data() vault = state.token_vault await self.program.rpc["vault_transfer"]( { "state_bump": state_bump, "amount": params.amount # @FIXME - can't be a decimal, must have mantissa / scale }, ctx=anchorpy.Context( accounts={ "state": state_pubkey, "to": to, "vault": vault, "authority": authority.public_key, "token_program": TOKEN_PROGRAM_ID, }, signers=[authority] ) )
View Source
@dataclass class ProgramInitParams: """Optional token mint""" mint: PublicKey = None
Optional token mint
View Source
@dataclass class VaultTransferParams: """Amount being transferred""" amount: Decimal
Amount being transferred
View Source
@dataclass class SwitchboardDecimal: mantissa: int scale: int """ Convert BN.js style num and return SwitchboardDecimal Args: obj (Any): Object with integer fields scale and mantissa (hex val) Returns: sbd (SwitchboardDecimal): SwitchboardDecimal """ @staticmethod def fromObj(obj: Any): return SwitchboardDecimal( mantissa=obj.mantissa, scale=obj.scale ) def to_decimal(self, sbd: object): mantissa = Decimal(sbd.mantissa) scale = sbd.scale return mantissa / Decimal(10 ** scale) @staticmethod def from_decimal(dec: Decimal): _, digits, exponent = dec.as_tuple() integer = reduce(lambda rst, x: rst * 10 + x, digits) return SwitchboardDecimal(integer, exponent) # convert any switchboard-decimal-like object to a decimal @staticmethod def sbd_to_decimal(sbd: object) -> Decimal: mantissa = Decimal(sbd.mantissa) scale = sbd.scale return mantissa / Decimal(10 ** scale) # for sending as argument in transaction def as_proper_sbd(self, program: anchorpy.Program): return program.type['SwitchboardDecimal'](self.mantissa, self.scale) def __eq__(self, __o: object) -> bool: if not (hasattr(__o, 'mantissa') and hasattr(__o, 'scale')): return False return self.mantissa == __o.mantissa and self.scale == __o.scale
SwitchboardDecimal(mantissa: int, scale: int)
Convert BN.js style num and return SwitchboardDecimal
Args: obj (Any): Object with integer fields scale and mantissa (hex val)
Returns: sbd (SwitchboardDecimal): SwitchboardDecimal
View Source
@staticmethod def fromObj(obj: Any): return SwitchboardDecimal( mantissa=obj.mantissa, scale=obj.scale )
View Source
def to_decimal(self, sbd: object): mantissa = Decimal(sbd.mantissa) scale = sbd.scale return mantissa / Decimal(10 ** scale)
View Source
@staticmethod def from_decimal(dec: Decimal): _, digits, exponent = dec.as_tuple() integer = reduce(lambda rst, x: rst * 10 + x, digits) return SwitchboardDecimal(integer, exponent)
View Source
@staticmethod def sbd_to_decimal(sbd: object) -> Decimal: mantissa = Decimal(sbd.mantissa) scale = sbd.scale return mantissa / Decimal(10 ** scale)
View Source
def as_proper_sbd(self, program: anchorpy.Program): return program.type['SwitchboardDecimal'](self.mantissa, self.scale)