switchboardpy

The Switchboard Python v2 Wrapper.

View Source
"""The Switchboard Python v2 Wrapper."""

from switchboardpy.aggregator import (
    AggregatorAccount, 
    AggregatorHistoryRow, 
    AggregatorInitParams, 
    AggregatorOpenRoundParams, 
    AggregatorSaveResultParams, 
    AggregatorSetHistoryBufferParams,
)
from switchboardpy.compiled import OracleJob
from switchboardpy.common import SBV2_DEVNET_PID, AccountParams, SwitchboardDecimal
from switchboardpy.crank import CrankAccount, CrankPopParams, CrankInitParams, CrankPushParams, CrankRow
from switchboardpy.job import JobAccount, JobInitParams
from switchboardpy.lease import LeaseAccount, LeaseExtendParams, LeaseInitParams, LeaseWithdrawParams
from switchboardpy.oracle import OracleAccount, OracleInitParams, OracleWithdrawParams
from switchboardpy.oraclequeue import OracleQueueAccount, OracleQueueInitParams
from switchboardpy.permission import PermissionAccount, PermissionInitParams, PermissionSetParams
from switchboardpy.program import ProgramStateAccount, ProgramInitParams, VaultTransferParams

__all__ = [
    "AccountParams",
    "AggregatorAccount", 
    "AggregatorHistoryRow", 
    "AggregatorInitParams", 
    "AggregatorOpenRoundParams", 
    "AggregatorSaveResultParams", 
    "AggregatorSetHistoryBufferParams",
    "CrankAccount",
    "CrankPopParams",
    "CrankInitParams",
    "CrankPushParams",
    "CrankRow",
    "JobAccount",
    "JobInitParams",
    "LeaseAccount",
    "LeaseExtendParams",
    "LeaseInitParams",
    "LeaseWithdrawParams",
    "OracleAccount",
    "OracleInitParams",
    "OracleWithdrawParams",
    "OracleQueueAccount",
    "OracleQueueInitParams",
    "OracleJob",
    "PermissionAccount",
    "PermissionInitParams",
    "PermissionSetParams",
    "ProgramStateAccount",
    "ProgramInitParams",
    "VaultTransferParams",
    "SwitchboardDecimal",
    "readRawVarint32",
    "readDelimitedFrom"
]
#  
@dataclass
class AccountParams:
View Source
@dataclass
class AccountParams:

    """program referencing the Switchboard program and IDL."""
    program: anchorpy.Program

    """
    Public key of the account being referenced. This will always be populated
    within the account wrapper.
    """
    public_key: PublicKey = None

    """Keypair of the account being referenced. This may not always be populated."""
    keypair: Keypair = None

program referencing the Switchboard program and IDL.

#   AccountParams( program: anchorpy.program.core.Program, public_key: solana.publickey.PublicKey = None, keypair: solana.keypair.Keypair = None )
#   program: anchorpy.program.core.Program

Public key of the account being referenced. This will always be populated within the account wrapper.

#   public_key: solana.publickey.PublicKey = None

Keypair of the account being referenced. This may not always be populated.

#   keypair: solana.keypair.Keypair = None
#   class AggregatorAccount:
View Source
class AggregatorAccount:
    """AggregatorAccount is the wrapper for an Aggregator, the structure for that keeps aggregated feed data / metadata.

    Attributes:
        program (anchor.Program): The anchor program ref
        public_key (PublicKey | None): This aggregator's public key
        keypair (Keypair | None): this aggregator's keypair
    """


    def __init__(self, params: AccountParams):
        if params.public_key is None and params.keypair is None:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        if params.keypair and params.public_key and params.keypair.public_key != params.public_key:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        self.program = params.program
        self.public_key = params.keypair.public_key if params.keypair else params.public_key
        self.keypair = params.keypair
    

    """
    Get name of an aggregator.

    Args:
        aggregator (Any): Anchor-loaded aggregator

    Returns:
        name string of the aggregator
    """
    @staticmethod
    def get_name(aggregator: Any) -> str:
        return  ''.join(map(chr, *aggregator.name)).decode("utf-8").replace(u"\u0000", "*").encode("utf-8")

        
    """
    Load and parse AggregatorAccount state based on the program IDL. 
    
    Returns:
        name (AggregatorAccount): data parsed in accordance with the
            Switchboard IDL.

    Args:

    Raises:
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def load_data(self):
        return await AggregatorAccountData.fetch(self.program.provider.connection, self.public_key)
        

    """
    Get AggregatorAccount historical data 

    Returns:
        name (AggregatorAccount): data parsed in accordance with the
            Switchboard IDL.

    Args:
        aggregator (Any): Optional aggregator 

    Raises:
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def load_history(self, aggregator: Any = None) -> Any:

        # if aggregator data passed in - use that, else load this aggregator
        aggregator = aggregator if aggregator else await self.load_data()

        # Compare History Buffer to default public key (zeroed out)
        if (aggregator.history_buffer == 11111111111111111111111111111111):
            return []

        # Fixed AggregatorHistoryRow size
        ROW_SIZE = 28

        # Get account data
        info = await self.program.provider.connection.get_account_info(aggregator.history_buffer) 
        buffer = info.data if info else []
        if not buffer or buffer.length < 12:
            return []
        
        # Read UInt32 as a Little Endian val, starting at position 8
        insert_idx: int = struct.unpack_from("<L", buffer, 8)[0] * ROW_SIZE

        front = []
        tail = []

        if not isinstance(buffer, list):
            return []
        
        for i in range(13, buffer.length, ROW_SIZE):
            if i + ROW_SIZE > buffer.length:
                break
            row = AggregatorHistoryRow.from_buffer(buffer)
            if row.timestamp == 0:
                break
            if i <= insert_idx:
                tail.append(row)
            else:
                front.append(row)
        return front.extend(tail)

    """
    Get the latest confirmed value stored in the aggregator account. 
    
    Args:
        aggregator (Any): Optional aggregator value to pass in

    Returns:
        value (Decimal): the latest feed value

    Raises:
        ValueError: If the aggregator currently holds no value
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def get_latest_value(self, aggregator: Optional[Any] = None) -> Decimal:
        aggregator = aggregator if aggregator else await self.load_data()
        if hasattr(aggregator, 'latest_confirmed_round') and aggregator.latest_confirmed_round.num_success == 0:
            raise ValueError('Aggregator currently holds no value.')
        return SwitchboardDecimal.sbd_to_decimal(aggregator.latest_confirmed_round.result)


    """
    Get the timestamp latest confirmed round stored in the aggregator account. 
    
    Args:
        aggregator (Any): Optional aggregator value to pass in

    Returns:
        timestamp (str): latest feed timestamp as hex string

    Raises:
        ValueError: If the aggregator currently holds no value
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def get_latest_feed_timestamp(self, aggregator: Optional[Any] = None) -> Decimal:
        aggregator = aggregator if aggregator else await self.load_data()
        if hasattr(aggregator, 'latest_confirmed_round') and aggregator.latest_confirmed_round.num_success == 0:
            raise ValueError('Aggregator currently holds no value.')

        return aggregator.latest_confirmed_round.round_open_timestamp


    """
    Get name of an aggregator.

    Args:
        aggregator (any): Anchor-loaded aggregator

    Returns:
        name string of the aggregator
    """
    @staticmethod
    def should_report_value(value: Decimal, aggregator: Optional[Any] = None) -> bool:
        if aggregator.latestConfirmedRound and aggregator.latest_confirmed_round.num_success == 0:
            return True
        timestamp = round(int(time.time()) / 1000)
        if aggregator.start_after > timestamp:
            return False
        variance_threshold = SwitchboardDecimal.sbd_to_decimal(aggregator.variance_threshold)
        latest_result = SwitchboardDecimal.sbd_to_decimal(aggregator.latest_confirmed_round.result)
        force_report_period = aggregator.force_report_period
        last_timestamp = aggregator.latest_confirmed_round.round_open_timestamp
        if last_timestamp + force_report_period < timestamp:
            return True
        diff = latest_result / value
        if abs(diff) > 1:
            diff = value / latest_result
        if diff < 0:
            return True
        
        change_percentage = 1 - diff * 100
        return change_percentage > variance_threshold

    """
    Get the individual oracle results of the latest confirmed round. 
    
    Args:
        aggregator (Any): Optional aggregator value to pass in

    Returns:
        timestamp (str): latest feed timestamp as hex string

    Raises:
        ValueError: If aggregator currently holds no value.
    """
    async def get_confirmed_round_results(self, aggregator: Optional[Any] = None) -> Decimal:
        
        aggregator = aggregator if aggregator else await self.load_data()
        if hasattr(aggregator, 'latest_confirmed_round') and aggregator.latest_confirmed_round.num_success == 0:
            raise ValueError('Aggregator currently holds no value.')
        results: list[Any] = []
        for i in range(aggregator.oracle_request_batch_size):
            if aggregator.latest_confirmed_round.medians_filfilled[i]:
                results.append({
                    "oracle_account": OracleAccount(AccountParams(program=self.program, public_key=aggregator.latest_confirmed_round.oracle_pubkeys_data[i])),
                    "value": SwitchboardDecimal.sbd_to_decimal(aggregator.latest_confirmed_round.medians_data[i])
                })
        return results

    """
    Get the hash of a list of OracleJobs
    
    Args:
        jobs (list[OracleJob]): list of jobs to hash

    Returns:
        hash (_Hash): hash as hex string

    Raises:
    """
    @staticmethod
    def produce_job_hash(jobs: list[OracleJob]):
        hash = hashlib.sha256()
        for job in jobs:
            job_hasher = hashlib.sha256()
            job_hasher.update(job.SerializeToString())
            hash.update(job_hasher.digest())
        return hash


    """
    Load and deserialize all jobs stored in this aggregator
    
    Args:
        aggregator (Any): Optional aggregator

    Returns:
        jobs (list[{ "job": OracleJob, "public_key": PublicKey, "account": JobAccountData }]) 

    Raises:
        ValueError: Failed to load feed jobs.
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def load_jobs(self, aggregator: Optional[Any] = None) -> Decimal:
        coder = anchorpy.AccountsCoder(self.program.idl)
        aggregator = aggregator if aggregator else await self.load_data()
        job_accounts_raw = await anchorpy.utils.rpc.get_multiple_accounts(self.program.provider.connection, aggregator.job_pubkeys_data[:aggregator.job_pubkeys_size], 10, Confirmed)
        if not job_accounts_raw:
            raise ValueError('Failed to load feed jobs.')
        
        # Deserialize OracleJob objects from each decoded JobAccountData 
        return [AggregatorLoadedJob(parseOracleJob(coder.decode(job.account.data).data), job.pubkey, coder.decode(job.account.data)) for job in job_accounts_raw]
        
    """
    Load all job hashes for each job stored in this aggregator
    
    Args:
        aggregator (Any): Optional aggregator

    Returns:
        hashes (list[str]): hashes for each job 

    Raises:
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def load_hashes(self, aggregator: Optional[Any] = None) -> Decimal:
        coder = anchorpy.AccountsCoder(self.program.idl)
        aggregator = aggregator if aggregator else await self.loadData()
        job_accounts_raw = await anchorpy.utils.rpc.get_multiple_accounts(self.program.provider.connection, aggregator.job_pubkeys_data[:aggregator.job_pubkeys_size])
        if not job_accounts_raw:
            raise ValueError('Failed to load feed jobs.')
        
        # get hashes from each decoded JobAccountData 
        return [coder.decode(job.account.data).hash for job in job_accounts_raw]
        
    
    """
    Get the size of an AggregatorAccount on chain
    
    Returns:
        int: size of the AggregatorAccount on chain
    """
    def size(self):
        return self.program.account["AggregatorAccountData"].size

    """
    Create and initialize the AggregatorAccount.
    
    Args:
        program (anchorpy.Program): Switchboard program representation holding connection and IDL
        params (AggregatorInitParams): init params for the aggregator

    Returns:
        AggregatorAccount
    """
    @staticmethod
    async def create(program: anchorpy.Program, aggregator_init_params: AggregatorInitParams):
        aggregator_account = aggregator_init_params.keypair or Keypair.generate()
        authority = aggregator_init_params.authority or aggregator_account.public_key
        size = program.account["AggregatorAccountData"].size
        state_account, state_bump = ProgramStateAccount.from_seed(program)
        state = await state_account.load_data()
        response = await program.provider.connection.get_minimum_balance_for_rent_exemption(size)
        lamports = response["result"]
        zero_decimal = SwitchboardDecimal(0, 0).as_proper_sbd(program)

        await program.rpc["aggregator_init"](
            {
                "name": aggregator_init_params.name or bytes([0] * 32),
                "metadata": aggregator_init_params.metadata or bytes([0] * 128),
                "batch_size": aggregator_init_params.batch_size,
                "min_oracle_results": aggregator_init_params.min_required_oracle_results,
                "min_job_results": aggregator_init_params.min_required_job_results,
                "min_update_delay_seconds": aggregator_init_params.min_update_delay_seconds,
                "variance_threshold": SwitchboardDecimal.from_decimal(aggregator_init_params.variance_threshold).as_proper_sbd(program) if aggregator_init_params.variance_threshold else zero_decimal,
                "force_report_period": aggregator_init_params.force_report_period or 0,
                "expiration": aggregator_init_params.expiration or 0,
                "state_bump": state_bump,
                "disable_crank": aggregator_init_params.disable_crank or False,
                "start_after": aggregator_init_params.start_after or 0,
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": aggregator_account.public_key,
                    "authority": authority,
                    "queue": aggregator_init_params.queue_account.public_key,
                    "author_wallet": aggregator_init_params.author_wallet or state.token_vault,
                    "program_state": state_account.public_key
                },
                signers=[aggregator_account],
                pre_instructions=[
                    create_account(
                        CreateAccountParams(
                            from_pubkey=program.provider.wallet.public_key, 
                            new_account_pubkey=aggregator_account.public_key,
                            lamports=lamports, 
                            space=size, 
                            program_id=program.program_id
                        )
                    )
                ]
            )
        )
        return AggregatorAccount(AccountParams(program=program, keypair=aggregator_account))

    """
    Create and set a history buffer for the aggregator

    Args:
        program (anchorpy.Program): Switchboard program representation holding connection and IDL
        params (AggregatorSetHistoryBufferParams)

    Returns:
        TransactionSignature
    """
    async def set_history_buffer(self, params: AggregatorSetHistoryBufferParams):
        buffer = Keypair.generate()
        program = self.program
        authority = params.authority or self.keypair
        HISTORY_ROW_SIZE = 28
        INSERT_IDX_SIZE = 4
        DISCRIMINATOR_SIZE = 8
        size = params.size * HISTORY_ROW_SIZE + INSERT_IDX_SIZE + DISCRIMINATOR_SIZE
        response = await program.provider.connection.get_minimum_balance_for_rent_exemption(size)
        lamports = response["result"]
        await program.rpc["aggregator_set_history_buffer"](
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                    "buffer": buffer.public_key
                },
                signers=[authority, buffer],
                pre_instructions=[
                    create_account(
                        CreateAccountParams(
                            from_pubkey=program.provider.wallet.public_key,
                            new_account_pubkey=buffer.public_key,
                            space=size,
                            lamports=lamports,
                            program_id=program.program_id
                        )
                    )
                ]
            )
        )

    """
    Open round on aggregator to get an update

    Args:
        program (anchorpy.Program): Switchboard program representation holding connection and IDL
        params (AggregatorOpenRoundParams)

    Returns:
        TransactionSignature
    """
    async def open_round(self, params: AggregatorOpenRoundParams):
        program = self.program
        state_account, state_bump = ProgramStateAccount.from_seed(program)
        queue = await params.oracle_queue_account.load_data()
        lease_account, lease_bump = LeaseAccount.from_seed(
            self.program,
            params.oracle_queue_account,
            self
        )
        lease = await lease_account.load_data()
        permission_account, permission_bump = PermissionAccount.from_seed(
            self.program,
            queue.authority,
            params.oracle_queue_account.public_key,
            self.public_key
        )
        return await program.rpc["aggregator_open_round"](
            {
                "state_bump": state_bump,
                "lease_bump": lease_bump,
                "permission_bump": permission_bump,
                "jitter": params.jitter or 0
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "lease":  lease_account.public_key,
                    "oracle_queue": params.oracle_queue_account.public_key,
                    "queue_authority": queue.authority,
                    "permission": permission_account.public_key,
                    "escrow": lease.escrow,
                    "program_state": state_account.public_key,
                    "payout_wallet": params.payout_wallet,
                    "token_program": TOKEN_PROGRAM_ID,
                    "data_buffer": queue.data_buffer,
                    "mint": (await params.oracle_queue_account.load_mint()).pubkey,
                },
            )
        )

    """
    Set min jobs sets the min jobs parameter. This is a suggestion to oracles 
    of the number of jobs that must resolve for a job to be considered valid.

    Args:
        params (AggregatorSetMinJobsParams): parameters pecifying the min jobs that must respond
    Returns:
        TransactionSignature

    """
    async def set_min_jobs(self, params: AggregatorSetMinJobsParams):
        authority = authority or self.keypair
        return await self.program.rpc['aggregator_set_min_jobs'](
            {
                "min_job_results": params.min_job_results
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                },
                signers=[authority]
            )
        )

    """
    Set min oracles sets the min oracles parameter. This will determine how many oracles need to come back with a 
    valid response for a result to be accepted. 

    Args:
        params (AggregatorSetMinOraclesParams): parameters pecifying the min jobs that must respond
    Returns:
        TransactionSignature

    """
    async def set_min_jobs(self, params: AggregatorSetMinJobsParams):
        authority = authority or self.keypair
        return await self.program.rpc['aggregator_set_min_jobs'](
            {
                "min_job_results": params.min_job_results
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                },
                signers=[authority]
            )
        )
    
    """
    RPC to add a new job to an aggregtor to be performed on feed updates.

    Args:
        job (JobAccount): specifying another job for this aggregator to fulfill on update
        authority (Keypair | None)
    Returns:
        TransactionSignature
    """
    async def add_job(self, job: JobAccount, weight: int = 0, authority: Optional[Keypair] = None) -> TransactionSignature:
        authority = authority or self.keypair
        
        return await self.program.rpc['aggregator_add_job'](
            {
                "weight": weight
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                    "job": job.public_key
                },
                signers=[authority]
            )
        )

    """
    RPC Set batch size / the number of oracles that'll respond to updates

    Args:
        params (AggregatorSetBatchSizeParams)
    Returns:
        TransactionSignature
    """
    async def set_batch_size(self, params: AggregatorSetBatchSizeParams) -> TransactionSignature:
        authority = authority or self.keypair
        return await self.program.rpc['aggregator_set_batch_size'](
            {
                "batch_size": params.batch_size
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                },
                signers=[authority]
            )
        )
        

    """
    RPC set variance threshold (only write updates when response is > variance threshold %)

    Args:
        params (AggregatorSetVarianceThresholdParams)
    Returns:
        TransactionSignature
    """
    async def set_variance_threshold(self, params: AggregatorSetVarianceThresholdParams) -> TransactionSignature:
        authority = authority or self.keypair
        
        return await self.program.rpc['aggregator_set_variance_threshold'](
            {
                "variance_threshold": SwitchboardDecimal.from_decimal(params.threshold)
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                },
                signers=[authority]
            )
        )

    """
    RPC set min oracles

    Args:
        params (AggregatorSetMinOraclesParams)
    Returns:
        TransactionSignature
    """
    async def set_min_oracles(self, params: AggregatorSetMinOraclesParams) -> TransactionSignature:
        authority = authority or self.keypair
        
        return await self.program.rpc['aggregator_set_min_oracles'](
            {
                "min_oracle_results": params.min_oracle_results
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                },
                signers=[authority]
            )
        )

    """
    RPC set update interval

    Args:
        params (AggregatorSetUpdateIntervalParams)
    Returns:
        TransactionSignature
    """
    async def set_update_interval(self, params: AggregatorSetUpdateIntervalParams) -> TransactionSignature:
        authority = authority or self.keypair
        
        return await self.program.rpc['aggregator_set_update_interval'](
            {
                "new_interval": params.new_interval
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                },
                signers=[authority]
            )
        )
        
    """
    Prevent new jobs from being added to the feed.

    Args:
        authority (Keypair | None): the current authority keypair

    Returns:
        TransactionSignature
    """
    async def lock(self, authority: Optional[Keypair] = None) -> TransactionSignature:
        authority = authority or self.keypair
        return await self.program.rpc['aggregator_lock'](
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                },
                signers=[authority]
            )
        )

    """
    Change the aggregator authority

    Args:
        new_authority (Keypair): The new authority
        current_authority (Keypair | None): the current authority keypair

    Returns:
        TransactionSignature
    """
    async def set_authority(self, new_authority: Keypair, current_authority: Optional[Keypair] = None) -> TransactionSignature:
        current_authority = current_authority or self.keypair
        return await self.program.rpc['aggregator_set_authoirty'](
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "new_authority": new_authority,
                    "authority": current_authority.public_key,
                },
                signers=[current_authority]
            )
        )

    """
    RPC to add remove job from an aggregtor.

    Args:
        job (JobAccount): specifying job to remove
        authority (Keypair | None)
    Returns:
        TransactionSignature
    """
    async def remove_job(self, job: JobAccount, authority: Optional[Keypair] = None) -> TransactionSignature:
        authority = authority or self.keypair
        return await self.program.rpc['aggregator_remove_job'](
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                    "job": job.public_key
                },
                signers=[authority]
            )
        )
    
    """
    Get Index of Oracle in Aggregator

    Args:
        oracle_pubkey (PublicKey): Public key belonging to the oracle

    Returns:
        int: index of the oracle, -1 if not found
    """
    async def get_oracle_index(self, oracle_pubkey: PublicKey):
        aggregator = await self.load_data()
        for i, curr_oracle_pubkey in enumerate(aggregator.current_round.oracle_pubkeys_data):
            if curr_oracle_pubkey == oracle_pubkey:
                return i
        return -1
    
    """
    Save Aggregator result

    Args:
        aggregator (Any): Aggregator data
        oracle_account (OracleAccount)
        params (AggregatorSaveResultParams)

    Returns:
        TransactionSignature
    """
    async def remove_job(self, aggregator: Any, oracle_account: OracleAccount, params: AggregatorSaveResultParams) -> TransactionSignature:
        return await self.program.provider.send(
            tx=(
                await self.save_result_txn(
                    aggregator,
                    oracle_account,
                    params
                )
            )
        )
    
    """
    RPC call for an oracle to save a result to an aggregator round.

    Args:
        aggregator (Any): Aggregator data
        oracle_account (OracleAccount)
        params (AggregatorSaveResultParams)

    Returns:
        TransactionSignature
    """
    async def save_result_txn(self, aggregator: Any, oracle_account: OracleAccount, params: AggregatorSaveResultParams):
        payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key)
        remaining_accounts: list[PublicKey] = []
        for i in range(aggregator.oracle_request_batch_size):
            remaining_accounts.append(aggregator.current_round.oracle_pubkeys_data[i])
        for oracle in params.oracles:
            remaining_accounts.push(oracle.token_account)
        queue_pubkey = aggregator.queue_pubkey
        queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=queue_pubkey))
        lease_account, lease_bump = LeaseAccount.from_seed(
            self.program,
            queue_account,
            self
        )
        escrow = get_associated_token_address(lease_account.public_key, params.token_mint)
        feed_permission_account, feed_permission_bump = PermissionAccount.from_seed(
            self.program,
            params.queue_authority,
            queue_account.public_key,
            self.public_key
        )
        oracle_permission_account, oracle_permission_bump = PermissionAccount.from_seed(
            self.program,
            params.queue_authority,
            queue_account.public_key,
            oracle_account.public_key
        )
        program_state_account, state_bump = ProgramStateAccount.from_seed(self.program)
        digest = await self.produce_job_hash(params.jobs).digest()
        history_buffer = aggregator.history_buffer
        if history_buffer == PublicKey('11111111111111111111111111111111'):
            history_buffer = self.public_key
        return self.program.transaction['aggregator_save_result'](
            {
                "oracle_idx": params.oracle_idx,
                "error": params.error,
                "value": SwitchboardDecimal.from_decimal(params.value).as_proper_sbd(self.program),
                "jobs_checksum": digest,
                "min_response": SwitchboardDecimal.from_decimal(params.min_response).as_proper_sbd(self.program),
                "max_response": SwitchboardDecimal.from_decimal(params.max_response).as_proper_sbd(self.program),
                "feed_permission_bump": feed_permission_bump,
                "oracle_permission_bump": oracle_permission_bump,
                "lease_bump": lease_bump,
                "state_bump": state_bump
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "oracle": oracle_account.public_key,
                    "oracle_authority": payer_keypair.public_key,
                    "oracle_queue": queue_account.public_key,
                    "feed_permission": feed_permission_account.public_key,
                    "oracle_permission": oracle_permission_account.public_key,
                    "lease": lease_account.public_key,
                    "escrow": escrow,
                    "token_program": TOKEN_PROGRAM_ID,
                    "program_state": program_state_account.public_key,
                    "history_buffer": history_buffer,
                    "mint": params.token_mint
                },
                remaining_accounts=[{"is_signer": False, "is_writable": True, "pubkey": pubkey} for pubkey in remaining_accounts]
            )
        )

AggregatorAccount is the wrapper for an Aggregator, the structure for that keeps aggregated feed data / metadata.

Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This aggregator's public key keypair (Keypair | None): this aggregator's keypair

#   AggregatorAccount(params: switchboardpy.AccountParams)
View Source
    def __init__(self, params: AccountParams):
        if params.public_key is None and params.keypair is None:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        if params.keypair and params.public_key and params.keypair.public_key != params.public_key:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        self.program = params.program
        self.public_key = params.keypair.public_key if params.keypair else params.public_key
        self.keypair = params.keypair
#   keypair

Get name of an aggregator.

Args: aggregator (Any): Anchor-loaded aggregator

Returns: name string of the aggregator

#  
@staticmethod
def get_name(aggregator: Any) -> str:
View Source
    @staticmethod
    def get_name(aggregator: Any) -> str:
        return  ''.join(map(chr, *aggregator.name)).decode("utf-8").replace(u"\u0000", "*").encode("utf-8")
#   async def load_data(self):
View Source
    async def load_data(self):
        return await AggregatorAccountData.fetch(self.program.provider.connection, self.public_key)
#   async def load_history(self, aggregator: Any = None) -> Any:
View Source
    async def load_history(self, aggregator: Any = None) -> Any:

        # if aggregator data passed in - use that, else load this aggregator
        aggregator = aggregator if aggregator else await self.load_data()

        # Compare History Buffer to default public key (zeroed out)
        if (aggregator.history_buffer == 11111111111111111111111111111111):
            return []

        # Fixed AggregatorHistoryRow size
        ROW_SIZE = 28

        # Get account data
        info = await self.program.provider.connection.get_account_info(aggregator.history_buffer) 
        buffer = info.data if info else []
        if not buffer or buffer.length < 12:
            return []
        
        # Read UInt32 as a Little Endian val, starting at position 8
        insert_idx: int = struct.unpack_from("<L", buffer, 8)[0] * ROW_SIZE

        front = []
        tail = []

        if not isinstance(buffer, list):
            return []
        
        for i in range(13, buffer.length, ROW_SIZE):
            if i + ROW_SIZE > buffer.length:
                break
            row = AggregatorHistoryRow.from_buffer(buffer)
            if row.timestamp == 0:
                break
            if i <= insert_idx:
                tail.append(row)
            else:
                front.append(row)
        return front.extend(tail)
#   async def get_latest_value(self, aggregator: Optional[Any] = None) -> decimal.Decimal:
View Source
    async def get_latest_value(self, aggregator: Optional[Any] = None) -> Decimal:
        aggregator = aggregator if aggregator else await self.load_data()
        if hasattr(aggregator, 'latest_confirmed_round') and aggregator.latest_confirmed_round.num_success == 0:
            raise ValueError('Aggregator currently holds no value.')
        return SwitchboardDecimal.sbd_to_decimal(aggregator.latest_confirmed_round.result)
#   async def get_latest_feed_timestamp(self, aggregator: Optional[Any] = None) -> decimal.Decimal:
View Source
    async def get_latest_feed_timestamp(self, aggregator: Optional[Any] = None) -> Decimal:
        aggregator = aggregator if aggregator else await self.load_data()
        if hasattr(aggregator, 'latest_confirmed_round') and aggregator.latest_confirmed_round.num_success == 0:
            raise ValueError('Aggregator currently holds no value.')

        return aggregator.latest_confirmed_round.round_open_timestamp
#  
@staticmethod
def should_report_value(value: decimal.Decimal, aggregator: Optional[Any] = None) -> bool:
View Source
    @staticmethod
    def should_report_value(value: Decimal, aggregator: Optional[Any] = None) -> bool:
        if aggregator.latestConfirmedRound and aggregator.latest_confirmed_round.num_success == 0:
            return True
        timestamp = round(int(time.time()) / 1000)
        if aggregator.start_after > timestamp:
            return False
        variance_threshold = SwitchboardDecimal.sbd_to_decimal(aggregator.variance_threshold)
        latest_result = SwitchboardDecimal.sbd_to_decimal(aggregator.latest_confirmed_round.result)
        force_report_period = aggregator.force_report_period
        last_timestamp = aggregator.latest_confirmed_round.round_open_timestamp
        if last_timestamp + force_report_period < timestamp:
            return True
        diff = latest_result / value
        if abs(diff) > 1:
            diff = value / latest_result
        if diff < 0:
            return True
        
        change_percentage = 1 - diff * 100
        return change_percentage > variance_threshold
#   async def get_confirmed_round_results(self, aggregator: Optional[Any] = None) -> decimal.Decimal:
View Source
    async def get_confirmed_round_results(self, aggregator: Optional[Any] = None) -> Decimal:
        
        aggregator = aggregator if aggregator else await self.load_data()
        if hasattr(aggregator, 'latest_confirmed_round') and aggregator.latest_confirmed_round.num_success == 0:
            raise ValueError('Aggregator currently holds no value.')
        results: list[Any] = []
        for i in range(aggregator.oracle_request_batch_size):
            if aggregator.latest_confirmed_round.medians_filfilled[i]:
                results.append({
                    "oracle_account": OracleAccount(AccountParams(program=self.program, public_key=aggregator.latest_confirmed_round.oracle_pubkeys_data[i])),
                    "value": SwitchboardDecimal.sbd_to_decimal(aggregator.latest_confirmed_round.medians_data[i])
                })
        return results
#  
@staticmethod
def produce_job_hash(jobs: list[switchboardpy.OracleJob]):
View Source
    @staticmethod
    def produce_job_hash(jobs: list[OracleJob]):
        hash = hashlib.sha256()
        for job in jobs:
            job_hasher = hashlib.sha256()
            job_hasher.update(job.SerializeToString())
            hash.update(job_hasher.digest())
        return hash
#   async def load_jobs(self, aggregator: Optional[Any] = None) -> decimal.Decimal:
View Source
    async def load_jobs(self, aggregator: Optional[Any] = None) -> Decimal:
        coder = anchorpy.AccountsCoder(self.program.idl)
        aggregator = aggregator if aggregator else await self.load_data()
        job_accounts_raw = await anchorpy.utils.rpc.get_multiple_accounts(self.program.provider.connection, aggregator.job_pubkeys_data[:aggregator.job_pubkeys_size], 10, Confirmed)
        if not job_accounts_raw:
            raise ValueError('Failed to load feed jobs.')
        
        # Deserialize OracleJob objects from each decoded JobAccountData 
        return [AggregatorLoadedJob(parseOracleJob(coder.decode(job.account.data).data), job.pubkey, coder.decode(job.account.data)) for job in job_accounts_raw]
#   async def load_hashes(self, aggregator: Optional[Any] = None) -> decimal.Decimal:
View Source
    async def load_hashes(self, aggregator: Optional[Any] = None) -> Decimal:
        coder = anchorpy.AccountsCoder(self.program.idl)
        aggregator = aggregator if aggregator else await self.loadData()
        job_accounts_raw = await anchorpy.utils.rpc.get_multiple_accounts(self.program.provider.connection, aggregator.job_pubkeys_data[:aggregator.job_pubkeys_size])
        if not job_accounts_raw:
            raise ValueError('Failed to load feed jobs.')
        
        # get hashes from each decoded JobAccountData 
        return [coder.decode(job.account.data).hash for job in job_accounts_raw]
#   def size(self):
View Source
    def size(self):
        return self.program.account["AggregatorAccountData"].size
#  
@staticmethod
async def create( program: anchorpy.program.core.Program, aggregator_init_params: switchboardpy.AggregatorInitParams ):
View Source
    @staticmethod
    async def create(program: anchorpy.Program, aggregator_init_params: AggregatorInitParams):
        aggregator_account = aggregator_init_params.keypair or Keypair.generate()
        authority = aggregator_init_params.authority or aggregator_account.public_key
        size = program.account["AggregatorAccountData"].size
        state_account, state_bump = ProgramStateAccount.from_seed(program)
        state = await state_account.load_data()
        response = await program.provider.connection.get_minimum_balance_for_rent_exemption(size)
        lamports = response["result"]
        zero_decimal = SwitchboardDecimal(0, 0).as_proper_sbd(program)

        await program.rpc["aggregator_init"](
            {
                "name": aggregator_init_params.name or bytes([0] * 32),
                "metadata": aggregator_init_params.metadata or bytes([0] * 128),
                "batch_size": aggregator_init_params.batch_size,
                "min_oracle_results": aggregator_init_params.min_required_oracle_results,
                "min_job_results": aggregator_init_params.min_required_job_results,
                "min_update_delay_seconds": aggregator_init_params.min_update_delay_seconds,
                "variance_threshold": SwitchboardDecimal.from_decimal(aggregator_init_params.variance_threshold).as_proper_sbd(program) if aggregator_init_params.variance_threshold else zero_decimal,
                "force_report_period": aggregator_init_params.force_report_period or 0,
                "expiration": aggregator_init_params.expiration or 0,
                "state_bump": state_bump,
                "disable_crank": aggregator_init_params.disable_crank or False,
                "start_after": aggregator_init_params.start_after or 0,
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": aggregator_account.public_key,
                    "authority": authority,
                    "queue": aggregator_init_params.queue_account.public_key,
                    "author_wallet": aggregator_init_params.author_wallet or state.token_vault,
                    "program_state": state_account.public_key
                },
                signers=[aggregator_account],
                pre_instructions=[
                    create_account(
                        CreateAccountParams(
                            from_pubkey=program.provider.wallet.public_key, 
                            new_account_pubkey=aggregator_account.public_key,
                            lamports=lamports, 
                            space=size, 
                            program_id=program.program_id
                        )
                    )
                ]
            )
        )
        return AggregatorAccount(AccountParams(program=program, keypair=aggregator_account))
#   async def set_history_buffer( self, params: switchboardpy.AggregatorSetHistoryBufferParams ):
View Source
    async def set_history_buffer(self, params: AggregatorSetHistoryBufferParams):
        buffer = Keypair.generate()
        program = self.program
        authority = params.authority or self.keypair
        HISTORY_ROW_SIZE = 28
        INSERT_IDX_SIZE = 4
        DISCRIMINATOR_SIZE = 8
        size = params.size * HISTORY_ROW_SIZE + INSERT_IDX_SIZE + DISCRIMINATOR_SIZE
        response = await program.provider.connection.get_minimum_balance_for_rent_exemption(size)
        lamports = response["result"]
        await program.rpc["aggregator_set_history_buffer"](
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                    "buffer": buffer.public_key
                },
                signers=[authority, buffer],
                pre_instructions=[
                    create_account(
                        CreateAccountParams(
                            from_pubkey=program.provider.wallet.public_key,
                            new_account_pubkey=buffer.public_key,
                            space=size,
                            lamports=lamports,
                            program_id=program.program_id
                        )
                    )
                ]
            )
        )
#   async def open_round(self, params: switchboardpy.AggregatorOpenRoundParams):
View Source
    async def open_round(self, params: AggregatorOpenRoundParams):
        program = self.program
        state_account, state_bump = ProgramStateAccount.from_seed(program)
        queue = await params.oracle_queue_account.load_data()
        lease_account, lease_bump = LeaseAccount.from_seed(
            self.program,
            params.oracle_queue_account,
            self
        )
        lease = await lease_account.load_data()
        permission_account, permission_bump = PermissionAccount.from_seed(
            self.program,
            queue.authority,
            params.oracle_queue_account.public_key,
            self.public_key
        )
        return await program.rpc["aggregator_open_round"](
            {
                "state_bump": state_bump,
                "lease_bump": lease_bump,
                "permission_bump": permission_bump,
                "jitter": params.jitter or 0
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "lease":  lease_account.public_key,
                    "oracle_queue": params.oracle_queue_account.public_key,
                    "queue_authority": queue.authority,
                    "permission": permission_account.public_key,
                    "escrow": lease.escrow,
                    "program_state": state_account.public_key,
                    "payout_wallet": params.payout_wallet,
                    "token_program": TOKEN_PROGRAM_ID,
                    "data_buffer": queue.data_buffer,
                    "mint": (await params.oracle_queue_account.load_mint()).pubkey,
                },
            )
        )
#   async def set_min_jobs(self, params: switchboardpy.aggregator.AggregatorSetMinJobsParams):
View Source
    async def set_min_jobs(self, params: AggregatorSetMinJobsParams):
        authority = authority or self.keypair
        return await self.program.rpc['aggregator_set_min_jobs'](
            {
                "min_job_results": params.min_job_results
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                },
                signers=[authority]
            )
        )
#   async def add_job( self, job: switchboardpy.JobAccount, weight: int = 0, authority: Optional[solana.keypair.Keypair] = None ) -> <function NewType.<locals>.new_type at 0x103bd2940>:
View Source
    async def add_job(self, job: JobAccount, weight: int = 0, authority: Optional[Keypair] = None) -> TransactionSignature:
        authority = authority or self.keypair
        
        return await self.program.rpc['aggregator_add_job'](
            {
                "weight": weight
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                    "job": job.public_key
                },
                signers=[authority]
            )
        )
#   async def set_batch_size( self, params: switchboardpy.aggregator.AggregatorSetBatchSizeParams ) -> <function NewType.<locals>.new_type at 0x103bd2940>:
View Source
    async def set_batch_size(self, params: AggregatorSetBatchSizeParams) -> TransactionSignature:
        authority = authority or self.keypair
        return await self.program.rpc['aggregator_set_batch_size'](
            {
                "batch_size": params.batch_size
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                },
                signers=[authority]
            )
        )
#   async def set_variance_threshold( self, params: switchboardpy.aggregator.AggregatorSetVarianceThresholdParams ) -> <function NewType.<locals>.new_type at 0x103bd2940>:
View Source
    async def set_variance_threshold(self, params: AggregatorSetVarianceThresholdParams) -> TransactionSignature:
        authority = authority or self.keypair
        
        return await self.program.rpc['aggregator_set_variance_threshold'](
            {
                "variance_threshold": SwitchboardDecimal.from_decimal(params.threshold)
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                },
                signers=[authority]
            )
        )
#   async def set_min_oracles( self, params: switchboardpy.aggregator.AggregatorSetMinOraclesParams ) -> <function NewType.<locals>.new_type at 0x103bd2940>:
View Source
    async def set_min_oracles(self, params: AggregatorSetMinOraclesParams) -> TransactionSignature:
        authority = authority or self.keypair
        
        return await self.program.rpc['aggregator_set_min_oracles'](
            {
                "min_oracle_results": params.min_oracle_results
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                },
                signers=[authority]
            )
        )
#   async def set_update_interval( self, params: switchboardpy.aggregator.AggregatorSetUpdateIntervalParams ) -> <function NewType.<locals>.new_type at 0x103bd2940>:
View Source
    async def set_update_interval(self, params: AggregatorSetUpdateIntervalParams) -> TransactionSignature:
        authority = authority or self.keypair
        
        return await self.program.rpc['aggregator_set_update_interval'](
            {
                "new_interval": params.new_interval
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                },
                signers=[authority]
            )
        )
#   async def lock( self, authority: Optional[solana.keypair.Keypair] = None ) -> <function NewType.<locals>.new_type at 0x103bd2940>:
View Source
    async def lock(self, authority: Optional[Keypair] = None) -> TransactionSignature:
        authority = authority or self.keypair
        return await self.program.rpc['aggregator_lock'](
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "authority": authority.public_key,
                },
                signers=[authority]
            )
        )
#   async def set_authority( self, new_authority: solana.keypair.Keypair, current_authority: Optional[solana.keypair.Keypair] = None ) -> <function NewType.<locals>.new_type at 0x103bd2940>:
View Source
    async def set_authority(self, new_authority: Keypair, current_authority: Optional[Keypair] = None) -> TransactionSignature:
        current_authority = current_authority or self.keypair
        return await self.program.rpc['aggregator_set_authoirty'](
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "new_authority": new_authority,
                    "authority": current_authority.public_key,
                },
                signers=[current_authority]
            )
        )
#   async def remove_job( self, aggregator: Any, oracle_account: switchboardpy.OracleAccount, params: switchboardpy.AggregatorSaveResultParams ) -> <function NewType.<locals>.new_type at 0x103bd2940>:
View Source
    async def remove_job(self, aggregator: Any, oracle_account: OracleAccount, params: AggregatorSaveResultParams) -> TransactionSignature:
        return await self.program.provider.send(
            tx=(
                await self.save_result_txn(
                    aggregator,
                    oracle_account,
                    params
                )
            )
        )
#   async def get_oracle_index(self, oracle_pubkey: solana.publickey.PublicKey):
View Source
    async def get_oracle_index(self, oracle_pubkey: PublicKey):
        aggregator = await self.load_data()
        for i, curr_oracle_pubkey in enumerate(aggregator.current_round.oracle_pubkeys_data):
            if curr_oracle_pubkey == oracle_pubkey:
                return i
        return -1
#   async def save_result_txn( self, aggregator: Any, oracle_account: switchboardpy.OracleAccount, params: switchboardpy.AggregatorSaveResultParams ):
View Source
    async def save_result_txn(self, aggregator: Any, oracle_account: OracleAccount, params: AggregatorSaveResultParams):
        payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key)
        remaining_accounts: list[PublicKey] = []
        for i in range(aggregator.oracle_request_batch_size):
            remaining_accounts.append(aggregator.current_round.oracle_pubkeys_data[i])
        for oracle in params.oracles:
            remaining_accounts.push(oracle.token_account)
        queue_pubkey = aggregator.queue_pubkey
        queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=queue_pubkey))
        lease_account, lease_bump = LeaseAccount.from_seed(
            self.program,
            queue_account,
            self
        )
        escrow = get_associated_token_address(lease_account.public_key, params.token_mint)
        feed_permission_account, feed_permission_bump = PermissionAccount.from_seed(
            self.program,
            params.queue_authority,
            queue_account.public_key,
            self.public_key
        )
        oracle_permission_account, oracle_permission_bump = PermissionAccount.from_seed(
            self.program,
            params.queue_authority,
            queue_account.public_key,
            oracle_account.public_key
        )
        program_state_account, state_bump = ProgramStateAccount.from_seed(self.program)
        digest = await self.produce_job_hash(params.jobs).digest()
        history_buffer = aggregator.history_buffer
        if history_buffer == PublicKey('11111111111111111111111111111111'):
            history_buffer = self.public_key
        return self.program.transaction['aggregator_save_result'](
            {
                "oracle_idx": params.oracle_idx,
                "error": params.error,
                "value": SwitchboardDecimal.from_decimal(params.value).as_proper_sbd(self.program),
                "jobs_checksum": digest,
                "min_response": SwitchboardDecimal.from_decimal(params.min_response).as_proper_sbd(self.program),
                "max_response": SwitchboardDecimal.from_decimal(params.max_response).as_proper_sbd(self.program),
                "feed_permission_bump": feed_permission_bump,
                "oracle_permission_bump": oracle_permission_bump,
                "lease_bump": lease_bump,
                "state_bump": state_bump
            },
            ctx=anchorpy.Context(
                accounts={
                    "aggregator": self.public_key,
                    "oracle": oracle_account.public_key,
                    "oracle_authority": payer_keypair.public_key,
                    "oracle_queue": queue_account.public_key,
                    "feed_permission": feed_permission_account.public_key,
                    "oracle_permission": oracle_permission_account.public_key,
                    "lease": lease_account.public_key,
                    "escrow": escrow,
                    "token_program": TOKEN_PROGRAM_ID,
                    "program_state": program_state_account.public_key,
                    "history_buffer": history_buffer,
                    "mint": params.token_mint
                },
                remaining_accounts=[{"is_signer": False, "is_writable": True, "pubkey": pubkey} for pubkey in remaining_accounts]
            )
        )
#  
@dataclass
class AggregatorHistoryRow:
View Source
@dataclass
class AggregatorHistoryRow:
    """AggregatorHistoryRow is a wrapper for the row structure of elements in the aggregator history buffer.
    
    Attributes:
        timestamp (int): timestamp of the aggregator result
        value (Decimal): Aggregator value at the timestamp
    """
    timestamp: int
    value: Decimal
    
    """
    Generate an AggregatorHistoryRow from a retrieved buffer representation

    Args:
        buf (list): Anchor-loaded buffer representation of AggregatorHistoryRow

    Returns:
        AggregatorHistoryRow
    """
    @staticmethod
    def from_buffer(buf: bytes):
        timestamp: int = struct.unpack_from("<L", buf[:8])[0]
        mantissa: int = struct.unpack_from("<L", buf[8:24])[0]
        scale: int = struct.unpack_from("<L", buf, 24)[0]
        decimal = SwitchboardDecimal.sbd_to_decimal({"mantissa": mantissa, "scale": scale})
        res = AggregatorHistoryRow(timestamp, decimal)
        return res

AggregatorHistoryRow is a wrapper for the row structure of elements in the aggregator history buffer.

Attributes: timestamp (int): timestamp of the aggregator result value (Decimal): Aggregator value at the timestamp

#   AggregatorHistoryRow(timestamp: int, value: decimal.Decimal)
#   value: decimal.Decimal

Generate an AggregatorHistoryRow from a retrieved buffer representation

Args: buf (list): Anchor-loaded buffer representation of AggregatorHistoryRow

Returns: AggregatorHistoryRow

#  
@staticmethod
def from_buffer(buf: bytes):
View Source
    @staticmethod
    def from_buffer(buf: bytes):
        timestamp: int = struct.unpack_from("<L", buf[:8])[0]
        mantissa: int = struct.unpack_from("<L", buf[8:24])[0]
        scale: int = struct.unpack_from("<L", buf, 24)[0]
        decimal = SwitchboardDecimal.sbd_to_decimal({"mantissa": mantissa, "scale": scale})
        res = AggregatorHistoryRow(timestamp, decimal)
        return res
#  
@dataclass
class AggregatorInitParams:
View Source
@dataclass
class AggregatorInitParams:
    """Number of oracles to request on aggregator update."""
    batch_size: int

    """Minimum number of oracle responses required before a round is validated."""
    min_required_oracle_results: int

    """Minimum number of seconds required between aggregator rounds."""
    min_required_job_results: int

    """Minimum number of seconds required between aggregator rounds."""
    min_update_delay_seconds: int

    """The queue to which this aggregator will be linked"""
    queue_account: OracleQueueAccount
 
    """Name of the aggregator to store on-chain."""
    name: bytes = None

    """Metadata of the aggregator to store on-chain."""
    metadata: bytes = None

    """unix_timestamp for which no feed update will occur before."""
    start_after: int = None

    """
    Change percentage required between a previous round and the current round.
    If variance percentage is not met, reject new oracle responses.
    """
    variance_threshold: Decimal = None

    """
    Number of seconds for which, even if the variance threshold is not passed,
    accept new responses from oracles.
    """
    force_report_period: int = None

    """
    unix_timestamp after which funds may be withdrawn from the aggregator.
    null/undefined/0 means the feed has no expiration.
    """
    expiration: int = None

    """
    An optional wallet for receiving kickbacks from job usage in feeds.
    Defaults to token vault.
    """
    keypair: Keypair = None
    
    """
    An optional wallet for receiving kickbacks from job usage in feeds.
    Defaults to token vault.
    """
    author_wallet: PublicKey = None

    """
    If included, this keypair will be the aggregator authority rather than
    the aggregator keypair.
    """
    authority: PublicKey = None

    """Disable automatic updates"""
    disable_crank: bool = None

Number of oracles to request on aggregator update.

#   AggregatorInitParams( batch_size: int, min_required_oracle_results: int, min_required_job_results: int, min_update_delay_seconds: int, queue_account: switchboardpy.OracleQueueAccount, name: bytes = None, metadata: bytes = None, start_after: int = None, variance_threshold: decimal.Decimal = None, force_report_period: int = None, expiration: int = None, keypair: solana.keypair.Keypair = None, author_wallet: solana.publickey.PublicKey = None, authority: solana.publickey.PublicKey = None, disable_crank: bool = None )
#   batch_size: int

Minimum number of oracle responses required before a round is validated.

#   min_required_oracle_results: int

Minimum number of seconds required between aggregator rounds.

#   min_required_job_results: int

Minimum number of seconds required between aggregator rounds.

#   min_update_delay_seconds: int

The queue to which this aggregator will be linked

Name of the aggregator to store on-chain.

#   name: bytes = None

Metadata of the aggregator to store on-chain.

#   metadata: bytes = None

unix_timestamp for which no feed update will occur before.

#   start_after: int = None

Change percentage required between a previous round and the current round. If variance percentage is not met, reject new oracle responses.

#   variance_threshold: decimal.Decimal = None

Number of seconds for which, even if the variance threshold is not passed, accept new responses from oracles.

#   force_report_period: int = None

unix_timestamp after which funds may be withdrawn from the aggregator. null/undefined/0 means the feed has no expiration.

#   expiration: int = None

An optional wallet for receiving kickbacks from job usage in feeds. Defaults to token vault.

#   keypair: solana.keypair.Keypair = None

An optional wallet for receiving kickbacks from job usage in feeds. Defaults to token vault.

#   author_wallet: solana.publickey.PublicKey = None

If included, this keypair will be the aggregator authority rather than the aggregator keypair.

#   authority: solana.publickey.PublicKey = None

Disable automatic updates

#   disable_crank: bool = None
#  
@dataclass
class AggregatorOpenRoundParams:
View Source
@dataclass
class AggregatorOpenRoundParams:

    """The oracle queue from which oracles are assigned this update."""
    oracle_queue_account: OracleQueueAccount
    
    """The token wallet which will receive rewards for calling update on this feed."""
    payout_wallet: PublicKey

    """
    Data feeds on a crank are ordered by their next available update time with some 
    level of jitter to mitigate oracles being assigned to the same update request upon 
    each iteration of the queue, which makes them susceptible to a malicous oracle. 
    """
    jitter: int = None

The oracle queue from which oracles are assigned this update.

#   AggregatorOpenRoundParams( oracle_queue_account: switchboardpy.OracleQueueAccount, payout_wallet: solana.publickey.PublicKey, jitter: int = None )

The token wallet which will receive rewards for calling update on this feed.

#   payout_wallet: solana.publickey.PublicKey

Data feeds on a crank are ordered by their next available update time with some level of jitter to mitigate oracles being assigned to the same update request upon each iteration of the queue, which makes them susceptible to a malicous oracle.

#   jitter: int = None
#  
@dataclass
class AggregatorSaveResultParams:
View Source
@dataclass
class AggregatorSaveResultParams:

    """Index in the list of oracles in the aggregator assigned to this round update."""
    oracle_idx: int

    """Reports that an error occured and the oracle could not send a value."""
    error: bool

    """Value the oracle is responding with for this update."""
    value: Decimal

    """
    The minimum value this oracle has seen this round for the jobs listed in the
    aggregator.
    """
    min_response: Decimal

    """
    The maximum value this oracle has seen this round for the jobs listed in the
    aggregator.
    """
    max_response: Decimal

    """List of OracleJobs that were performed to produce this result"""
    jobs: list[OracleJob]

    """Authority of the queue the aggregator is attached to"""
    queue_authority: PublicKey

    """Program token mint"""
    token_mint: PublicKey

    """List of parsed oracles"""
    oracles: list[Any]

Index in the list of oracles in the aggregator assigned to this round update.

#   AggregatorSaveResultParams( oracle_idx: int, error: bool, value: decimal.Decimal, min_response: decimal.Decimal, max_response: decimal.Decimal, jobs: list[switchboardpy.OracleJob], queue_authority: solana.publickey.PublicKey, token_mint: solana.publickey.PublicKey, oracles: list[typing.Any] )
#   oracle_idx: int

Reports that an error occured and the oracle could not send a value.

#   error: bool

Value the oracle is responding with for this update.

#   value: decimal.Decimal

The minimum value this oracle has seen this round for the jobs listed in the aggregator.

#   min_response: decimal.Decimal

The maximum value this oracle has seen this round for the jobs listed in the aggregator.

#   max_response: decimal.Decimal

List of OracleJobs that were performed to produce this result

Authority of the queue the aggregator is attached to

#   queue_authority: solana.publickey.PublicKey

Program token mint

#   token_mint: solana.publickey.PublicKey

List of parsed oracles

#  
@dataclass
class AggregatorSetHistoryBufferParams:
View Source
@dataclass
class AggregatorSetHistoryBufferParams:
    
    """Number of elements for the history buffer to fit"""
    size: int

    """Authority keypair for the aggregator"""
    authority: Keypair = None

Number of elements for the history buffer to fit

#   AggregatorSetHistoryBufferParams(size: int, authority: solana.keypair.Keypair = None)
#   size: int

Authority keypair for the aggregator

#   authority: solana.keypair.Keypair = None
#   class CrankAccount:
View Source
class CrankAccount:
    """ A Switchboard account representing a crank of aggregators ordered by next update time.

    Attributes:
        program (anchor.Program): The anchor program ref
        public_key (PublicKey | None): This crank's public key
        keypair (Keypair | None): this crank's keypair
    """


    def __init__(self, params: AccountParams):
        if params.public_key is None and params.keypair is None:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        if params.keypair and params.public_key and params.keypair.public_key != params.public_key:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        self.program = params.program
        self.public_key = params.keypair.public_key if params.keypair else params.public_key
        self.keypair = params.keypair
    
    """
    Get the size of an CrankAccount on chain

    Args:

    Returns:
        int: size of the CrankAccount type on chain
    """
    def size(self):
        return self.program.account["CrankAccountData"].size

    """
    Load and parse CrankAccount data based on the program IDL

    Args:
    
    Returns:
        CrankAccount

    Raises:
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def load_data(self):
        return await CrankAccountData.fetch(self.program.provider.connection, self.public_key)


    """
    Create and initialize the CrankAccount.

    Args:
        program (anchor.Program): Switchboard program representation holding connection and IDL.
        params (CrankInitParams)
    
    Returns:
        CrankAccount
    """
    @staticmethod
    async def create(program: anchorpy.Program, params: CrankInitParams):
        crank_account = Keypair.generate()
        buffer = Keypair.generate()
        size = program.account["CrankAccountData"].size
        max_rows = params.max_rows or 500
        crank_size = max_rows * 40 + 8
        response = await program.provider.connection.get_minimum_balance_for_rent_exemption(crank_size)
        lamports = response["result"]
        await program.rpc["crank_init"](
            {
                "name": params.name or bytes([0] * 32),
                "metadata": params.metadata or bytes([0] * 128),
                "crank_size": max_rows
            },
            ctx=anchorpy.Context(
                accounts={
                    "crank": crank_account.public_key,
                    "queue": params.queue_account.public_key,
                    "buffer": buffer.public_key,
                    "system_program": system_program.SYS_PROGRAM_ID,
                    "payer": program.provider.wallet.public_key
                },
                signers=[crank_account, buffer],
                pre_instructions=[
                    create_account(
                        CreateAccountParams(
                            from_pubkey=program.provider.wallet.public_key, 
                            new_account_pubkey=buffer.public_key,
                            lamports=lamports, 
                            space=size, 
                            program_id=program.program_id
                        )
                    )
                ]
            )
        )

        return CrankAccount(AccountParams(program=program, keypair=crank_account))

    """
    Pushes a new aggregator onto the crank
    
    Args:
        params (CrankPushParams): aggregator and related data
    
    Returns:
        TransactionSignature
    """
    async def push(self, params: CrankPushParams):
        aggregator_account: AggregatorAccount = params.aggregator_account
        crank = await self.load_data()
        queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=crank.queue_pubkey))
        queue = await queue_account.load_data()
        queue_authority = queue.authority
        lease_account, lease_bump = LeaseAccount.from_seed(self.program, queue_account, aggregator_account)
        lease: Any = None
        try:
            lease = await lease_account.load_data()
        except Exception:
            raise ValueError('A requested lease pda account has not been initialized.')
        permission_account, permission_bump = PermissionAccount.from_seed(
            self.program,
            queue_authority,
            queue_account.public_key,
            aggregator_account.public_key
        )
        try:
            await lease_account.load_data()
        except Exception:
            raise ValueError('A requested permission pda account has not been initialized.')
        program_state_account, state_bump = ProgramStateAccount.from_seed(self.program)
        return await self.program.rpc["crank_push"](
            {
                "state_bump": state_bump,
                "permission_bump": permission_bump
            },
            ctx=anchorpy.Context(
                accounts={
                    "crank": self.public_key,
                    "aggregator": aggregator_account.public_key,
                    "oracle_queue": queue_account.public_key,
                    "queue_authority": queue_authority,
                    "permission": permission_account.public_key,
                    "lease": lease_account.public_key,
                    "escrow": lease.escrow,
                    "program_state": program_state_account.public_key,
                    "data_buffer": crank.data_buffer
                }
            )
        )


    """
    Pops a tx from the crank.

    Args:
        params (CrankPopParams)

    Returns:
        TransactionSignature    
    """
    async def pop_txn(self, params: CrankPopParams):
        fail_open_on_account_mismatch = params.fail_open_on_mismatch or False
        next = params.ready_pubkeys or await self.peak_next_ready(5)
        if len(next) == 0:
            raise ValueError('Crank is not ready to be turned')
        remaining_accounts: list[PublicKey] = []
        lease_bumps_map: Dict[str, int] = {}
        permission_bumps_map: Dict[str, int] = {}
        queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=params.queue_pubkey))
        for row in next:
            aggregator_account = AggregatorAccount(AccountParams(program=self.program, public_key=row))
            lease_account, lease_bump = LeaseAccount.from_seed(
                self.program,
                queue_account,
                aggregator_account
            )
            permission_account, permission_bump = PermissionAccount.from_seed(
                self.program,
                params.queue_authority,
                params.queue_pubkey,
                row
            )
            escrow = get_associated_token_address(
                lease_account.public_key,
                params.token_mint
            )
            remaining_accounts.append(aggregator_account.public_key)
            remaining_accounts.append(lease_account.public_key)
            remaining_accounts.append(escrow)
            remaining_accounts.append(permission_account.public_key)
            lease_bumps_map[row.to_base58()] = lease_bump
            permission_bumps_map[row.to_base58()] = permission_bump
        remaining_accounts.sort(key=lambda key : bytes(key))
        crank = params.crank
        queue = params.queue
        lease_bumps: list[int] = []
        permission_bumps: list[int] = []
        for key in remaining_accounts:
            lease_bumps.append(lease_bumps_map.get(key.to_base58()) or 0)
            permission_bumps.append(permission_bumps_map.get(key.to_base58()) or 0)
        program_state_account, state_bump = ProgramStateAccount.from_seed(self.program)
        payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key)
        return self.program.transaction["crank_pop"](
            {
                "state_bump": state_bump,
                "lease_bumps": bytes(lease_bumps),
                "permission_bumps": bytes(permission_bumps),
                "nonce": params.nonce or None,
                "fail_open_on_account_mismatch": fail_open_on_account_mismatch
            },
            ctx=anchorpy.Context(
                accounts={
                    "crank": self.public_key,
                    "oracle_queue": params.queue_pubkey,
                    "queue_authority": params.queue_authority,
                    "program_state": program_state_account.public_key,
                    "payout_wallet": params.payout_wallet,
                    "token_program": TOKEN_PROGRAM_ID,
                    "crank_data_buffer": crank.data_buffer,
                    "queue_data_buffer": queue.data_buffer
                },
                remaining_accounts=[{ "is_signer": False, "is_writable": True, "pubkey": pubkey } for pubkey in remaining_accounts],
                signers=[payer_keypair]
            )
        )

    """
    Pops an aggregator from the crank

    Args:
        params (CrankPopParams)
    
    Returns:
        TransactionSignature
    """
    async def pop(self, params: CrankPopParams):
        payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key)
        txn = await self.pop_txn(params)
        return await self.program.provider.connection.send_transaction(txn, [payer_keypair])
    
    """
    Get an array of the next aggregator pubkeys to be popped from the crank, limited by n

    Args:
        n (int): limit of pubkeys to return

    Returns:
        list[CrankRow]: Pubkey list of Aggregators and next timestamp to be popped, ordered by timestamp
    """
    async def peak_next_with_time(self, n: int):
        crank = await self.load_data()

        # get list slice of length pq_size 
        pq_data: list[CrankRow] = crank.pq_data[:crank.pq_size]

        # sort by CrankRow next timestamp
        pq_data.sort(key=lambda crank_row: crank_row.next_timestamp)

        # return items
        return pq_data[:n]

    """
    Get an array of the next readily updateable aggregator pubkeys to be popped
    from the crank, limited by n

    Args:
        n (Optional[int]): limit of pubkeys to return

    Returns:
        list[PublicKey]: Pubkey list of Aggregators and next timestamp to be popped, ordered by timestamp
    """
    async def peak_next_ready(self, n: Optional[int] = None):
        now = math.floor(time.time())
        crank = await self.load_data()
        pq_data: list[CrankRow] = crank.pq_data[:crank.pq_size]
        key = lambda crank_row: crank_row.next_timestamp
        return [item.pubkey for item in list(filter(lambda item: now >= item.next_timestamp, pq_data)).sort(key=key)[:(n or len(pq_data))]]
        
    """
    Get an array of the next aggregator pubkeys to be popped from the crank, limited by n

    Args:
        n (int): limit of pubkeys to return

    Returns:
        list[PublicKey]: Pubkey list of Aggregators and next timestamp to be popped, ordered by timestamp
    """
    async def peak_next(self, n: int):
        crank = await self.load_data()
        pq_data: list[CrankRow] = crank.pq_data[:crank.pq_size]
        pq_data.sort(key=lambda crank_row: crank_row.next_timestamp)
        return [item.pubkey for item in pq_data[:n]]

A Switchboard account representing a crank of aggregators ordered by next update time.

Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This crank's public key keypair (Keypair | None): this crank's keypair

#   CrankAccount(params: switchboardpy.AccountParams)
View Source
    def __init__(self, params: AccountParams):
        if params.public_key is None and params.keypair is None:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        if params.keypair and params.public_key and params.keypair.public_key != params.public_key:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        self.program = params.program
        self.public_key = params.keypair.public_key if params.keypair else params.public_key
        self.keypair = params.keypair
#   keypair

Get the size of an CrankAccount on chain

Args:

Returns: int: size of the CrankAccount type on chain

#   def size(self):
View Source
    def size(self):
        return self.program.account["CrankAccountData"].size
#   async def load_data(self):
View Source
    async def load_data(self):
        return await CrankAccountData.fetch(self.program.provider.connection, self.public_key)
#  
@staticmethod
async def create( program: anchorpy.program.core.Program, params: switchboardpy.CrankInitParams ):
View Source
    @staticmethod
    async def create(program: anchorpy.Program, params: CrankInitParams):
        crank_account = Keypair.generate()
        buffer = Keypair.generate()
        size = program.account["CrankAccountData"].size
        max_rows = params.max_rows or 500
        crank_size = max_rows * 40 + 8
        response = await program.provider.connection.get_minimum_balance_for_rent_exemption(crank_size)
        lamports = response["result"]
        await program.rpc["crank_init"](
            {
                "name": params.name or bytes([0] * 32),
                "metadata": params.metadata or bytes([0] * 128),
                "crank_size": max_rows
            },
            ctx=anchorpy.Context(
                accounts={
                    "crank": crank_account.public_key,
                    "queue": params.queue_account.public_key,
                    "buffer": buffer.public_key,
                    "system_program": system_program.SYS_PROGRAM_ID,
                    "payer": program.provider.wallet.public_key
                },
                signers=[crank_account, buffer],
                pre_instructions=[
                    create_account(
                        CreateAccountParams(
                            from_pubkey=program.provider.wallet.public_key, 
                            new_account_pubkey=buffer.public_key,
                            lamports=lamports, 
                            space=size, 
                            program_id=program.program_id
                        )
                    )
                ]
            )
        )

        return CrankAccount(AccountParams(program=program, keypair=crank_account))
#   async def push(self, params: switchboardpy.CrankPushParams):
View Source
    async def push(self, params: CrankPushParams):
        aggregator_account: AggregatorAccount = params.aggregator_account
        crank = await self.load_data()
        queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=crank.queue_pubkey))
        queue = await queue_account.load_data()
        queue_authority = queue.authority
        lease_account, lease_bump = LeaseAccount.from_seed(self.program, queue_account, aggregator_account)
        lease: Any = None
        try:
            lease = await lease_account.load_data()
        except Exception:
            raise ValueError('A requested lease pda account has not been initialized.')
        permission_account, permission_bump = PermissionAccount.from_seed(
            self.program,
            queue_authority,
            queue_account.public_key,
            aggregator_account.public_key
        )
        try:
            await lease_account.load_data()
        except Exception:
            raise ValueError('A requested permission pda account has not been initialized.')
        program_state_account, state_bump = ProgramStateAccount.from_seed(self.program)
        return await self.program.rpc["crank_push"](
            {
                "state_bump": state_bump,
                "permission_bump": permission_bump
            },
            ctx=anchorpy.Context(
                accounts={
                    "crank": self.public_key,
                    "aggregator": aggregator_account.public_key,
                    "oracle_queue": queue_account.public_key,
                    "queue_authority": queue_authority,
                    "permission": permission_account.public_key,
                    "lease": lease_account.public_key,
                    "escrow": lease.escrow,
                    "program_state": program_state_account.public_key,
                    "data_buffer": crank.data_buffer
                }
            )
        )
#   async def pop_txn(self, params: switchboardpy.CrankPopParams):
View Source
    async def pop_txn(self, params: CrankPopParams):
        fail_open_on_account_mismatch = params.fail_open_on_mismatch or False
        next = params.ready_pubkeys or await self.peak_next_ready(5)
        if len(next) == 0:
            raise ValueError('Crank is not ready to be turned')
        remaining_accounts: list[PublicKey] = []
        lease_bumps_map: Dict[str, int] = {}
        permission_bumps_map: Dict[str, int] = {}
        queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=params.queue_pubkey))
        for row in next:
            aggregator_account = AggregatorAccount(AccountParams(program=self.program, public_key=row))
            lease_account, lease_bump = LeaseAccount.from_seed(
                self.program,
                queue_account,
                aggregator_account
            )
            permission_account, permission_bump = PermissionAccount.from_seed(
                self.program,
                params.queue_authority,
                params.queue_pubkey,
                row
            )
            escrow = get_associated_token_address(
                lease_account.public_key,
                params.token_mint
            )
            remaining_accounts.append(aggregator_account.public_key)
            remaining_accounts.append(lease_account.public_key)
            remaining_accounts.append(escrow)
            remaining_accounts.append(permission_account.public_key)
            lease_bumps_map[row.to_base58()] = lease_bump
            permission_bumps_map[row.to_base58()] = permission_bump
        remaining_accounts.sort(key=lambda key : bytes(key))
        crank = params.crank
        queue = params.queue
        lease_bumps: list[int] = []
        permission_bumps: list[int] = []
        for key in remaining_accounts:
            lease_bumps.append(lease_bumps_map.get(key.to_base58()) or 0)
            permission_bumps.append(permission_bumps_map.get(key.to_base58()) or 0)
        program_state_account, state_bump = ProgramStateAccount.from_seed(self.program)
        payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key)
        return self.program.transaction["crank_pop"](
            {
                "state_bump": state_bump,
                "lease_bumps": bytes(lease_bumps),
                "permission_bumps": bytes(permission_bumps),
                "nonce": params.nonce or None,
                "fail_open_on_account_mismatch": fail_open_on_account_mismatch
            },
            ctx=anchorpy.Context(
                accounts={
                    "crank": self.public_key,
                    "oracle_queue": params.queue_pubkey,
                    "queue_authority": params.queue_authority,
                    "program_state": program_state_account.public_key,
                    "payout_wallet": params.payout_wallet,
                    "token_program": TOKEN_PROGRAM_ID,
                    "crank_data_buffer": crank.data_buffer,
                    "queue_data_buffer": queue.data_buffer
                },
                remaining_accounts=[{ "is_signer": False, "is_writable": True, "pubkey": pubkey } for pubkey in remaining_accounts],
                signers=[payer_keypair]
            )
        )
#   async def pop(self, params: switchboardpy.CrankPopParams):
View Source
    async def pop(self, params: CrankPopParams):
        payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key)
        txn = await self.pop_txn(params)
        return await self.program.provider.connection.send_transaction(txn, [payer_keypair])
#   async def peak_next_with_time(self, n: int):
View Source
    async def peak_next_with_time(self, n: int):
        crank = await self.load_data()

        # get list slice of length pq_size 
        pq_data: list[CrankRow] = crank.pq_data[:crank.pq_size]

        # sort by CrankRow next timestamp
        pq_data.sort(key=lambda crank_row: crank_row.next_timestamp)

        # return items
        return pq_data[:n]
#   async def peak_next_ready(self, n: Optional[int] = None):
View Source
    async def peak_next_ready(self, n: Optional[int] = None):
        now = math.floor(time.time())
        crank = await self.load_data()
        pq_data: list[CrankRow] = crank.pq_data[:crank.pq_size]
        key = lambda crank_row: crank_row.next_timestamp
        return [item.pubkey for item in list(filter(lambda item: now >= item.next_timestamp, pq_data)).sort(key=key)[:(n or len(pq_data))]]
#   async def peak_next(self, n: int):
View Source
    async def peak_next(self, n: int):
        crank = await self.load_data()
        pq_data: list[CrankRow] = crank.pq_data[:crank.pq_size]
        pq_data.sort(key=lambda crank_row: crank_row.next_timestamp)
        return [item.pubkey for item in pq_data[:n]]
#  
@dataclass
class CrankPopParams:
View Source
@dataclass
class CrankPopParams:

    """Specifies the wallet to reward for turning the crank."""
    payout_wallet: PublicKey

    """The pubkey of the linked oracle queue."""
    queue_pubkey: PublicKey

    """The pubkey of the linked oracle queue authority."""
    queue_authority: PublicKey

    """CrankAccount data"""
    crank: Any

    """QueueAccount data"""
    queue: Any

    """Token mint pubkey"""
    token_mint: PublicKey

    """
    Array of pubkeys to attempt to pop. If discluded, this will be loaded
    from the crank upon calling.
    """
    ready_pubkeys: list[PublicKey] = None

    """Nonce to allow consecutive crank pops with the same blockhash."""
    nonce: int = None
    fail_open_on_mismatch: bool = None

Specifies the wallet to reward for turning the crank.

#   CrankPopParams( payout_wallet: solana.publickey.PublicKey, queue_pubkey: solana.publickey.PublicKey, queue_authority: solana.publickey.PublicKey, crank: Any, queue: Any, token_mint: solana.publickey.PublicKey, ready_pubkeys: list[solana.publickey.PublicKey] = None, nonce: int = None, fail_open_on_mismatch: bool = None )
#   payout_wallet: solana.publickey.PublicKey

The pubkey of the linked oracle queue.

#   queue_pubkey: solana.publickey.PublicKey

The pubkey of the linked oracle queue authority.

#   queue_authority: solana.publickey.PublicKey

CrankAccount data

#   crank: Any

QueueAccount data

#   queue: Any

Token mint pubkey

#   token_mint: solana.publickey.PublicKey

Array of pubkeys to attempt to pop. If discluded, this will be loaded from the crank upon calling.

#   ready_pubkeys: list[solana.publickey.PublicKey] = None

Nonce to allow consecutive crank pops with the same blockhash.

#   nonce: int = None
#   fail_open_on_mismatch: bool = None
#  
@dataclass
class CrankInitParams:
View Source
@dataclass
class CrankInitParams:

    """OracleQueueAccount for which this crank is associated"""
    queue_account: OracleQueueAccount

    """Buffer specifying crank name"""
    name: bytes = None

    """Buffer specifying crank metadata"""
    metadata: bytes = None

    """Optional max number of rows"""
    max_rows: int = None

OracleQueueAccount for which this crank is associated

#   CrankInitParams( queue_account: switchboardpy.OracleQueueAccount, name: bytes = None, metadata: bytes = None, max_rows: int = None )

Buffer specifying crank name

#   name: bytes = None

Buffer specifying crank metadata

#   metadata: bytes = None

Optional max number of rows

#   max_rows: int = None
#  
@dataclass
class CrankPushParams:
View Source
@dataclass
class CrankPushParams:
    aggregator_account: AggregatorAccount

CrankPushParams(aggregator_account: switchboardpy.AggregatorAccount)

#   CrankPushParams(aggregator_account: switchboardpy.AggregatorAccount)
#  
@dataclass
class CrankRow:
View Source
@dataclass
class CrankRow:

    """Aggregator account pubkey"""
    pubkey: PublicKey

    """Next aggregator update timestamp to order the crank by"""
    next_timestamp: int

    @staticmethod
    def from_bytes(buf: bytes):
        pass

Aggregator account pubkey

#   CrankRow(pubkey: solana.publickey.PublicKey, next_timestamp: int)
#   pubkey: solana.publickey.PublicKey

Next aggregator update timestamp to order the crank by

#  
@staticmethod
def from_bytes(buf: bytes):
View Source
    @staticmethod
    def from_bytes(buf: bytes):
        pass
#   class JobAccount:
View Source
class JobAccount:
    """ A Switchboard account representing a job for an oracle to perform, stored as
        a protocol buffer.

    Attributes:
        program (anchor.Program): The anchor program ref
        public_key (PublicKey | None): This aggregator's public key
        keypair (Keypair | None): this aggregator's keypair
    """


    def __init__(self, params: AccountParams):
        if params.public_key is None and params.keypair is None:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        if params.keypair and params.public_key and params.keypair.public_key != params.public_key:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        self.program = params.program
        self.public_key = params.keypair.public_key if params.keypair else params.public_key
        self.keypair = params.keypair
    
    """
    Load and parse JobAccount state based on the program IDL. 
    
    Returns:
        name (JobAccount): data parsed in accordance with the
            Switchboard IDL.

    Args:

    Raises:
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def load_data(self):
        return await JobAccountData.fetch(self.program.provider.connection, self.public_key)


    """
    Load and parse the protobuf from the raw buffer stored in the JobAccount.
    
    Returns:
        OracleJob

    Raises:
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def load_job(self):
        job = await self.load_data()
        return parseOracleJob(job.data);

    """
    Load and parse JobAccount data based on the program IDL from a buffer.
    
    Args:
        program (anchorpy.Program)
        buf (bytes): Bytes representation of the JobAccount

    Returns:
        Any: JobAccountData parsed in accordance with the
            Switchboard IDL.
    """
    @staticmethod
    def decode(program: anchorpy.Program, buf: bytes):
        coder = anchorpy.Coder(program.idl)
        return coder.accounts.decode(buf)
    
    """
    Create and initialize the JobAccount

    Args:
        program (anchor.Program)
        params (JobInitParams)

    Returns:
        JobAccount
    """
    @staticmethod
    async def create(program: anchorpy.Program, params: JobInitParams):

        job_account = params.keypair or Keypair.generate()
        size = 280 + len(params.data) + (''.join(params.variables) if params.variables else 0)
        state_account, state_bump = ProgramStateAccount.from_seed(program)
        state = await state_account.load_data()
        response = await program.provider.connection.get_minimum_balance_for_rent_exemption(size)
        lamports = response["result"]
        await program.rpc["job_init"](
            {
                "name": params.name or bytes([0] * 32),
                "expiration": params.expiration or 0,
                "data": params.data,
                "variables": [bytes(b'') for _ in params.variables] if params.variables else [],
                "state_bump": state_bump
            },
            ctx=anchorpy.Context(
                accounts={
                    "job": job_account.public_key,
                    "authority": params.authority or state.token_vault,
                    "program_state": state_account.public_key
                },
                signers=[job_account],
                pre_instructions=[
                    create_account(
                        CreateAccountParams(
                            from_pubkey=program.provider.wallet.public_key, 
                            new_account_pubkey=job_account.public_key,
                            lamports=lamports, 
                            space=size, 
                            program_id=program.program_id
                        )
                    )
                ]
            )
        )
        return JobAccount(AccountParams(program=program, keypair=job_account))

A Switchboard account representing a job for an oracle to perform, stored as a protocol buffer.

Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This aggregator's public key keypair (Keypair | None): this aggregator's keypair

#   JobAccount(params: switchboardpy.AccountParams)
View Source
    def __init__(self, params: AccountParams):
        if params.public_key is None and params.keypair is None:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        if params.keypair and params.public_key and params.keypair.public_key != params.public_key:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        self.program = params.program
        self.public_key = params.keypair.public_key if params.keypair else params.public_key
        self.keypair = params.keypair
#   keypair

Load and parse JobAccount state based on the program IDL.

Returns: name (JobAccount): data parsed in accordance with the Switchboard IDL.

Args:

Raises: AccountDoesNotExistError: If the account doesn't exist. AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.

#   async def load_data(self):
View Source
    async def load_data(self):
        return await JobAccountData.fetch(self.program.provider.connection, self.public_key)
#   async def load_job(self):
View Source
    async def load_job(self):
        job = await self.load_data()
        return parseOracleJob(job.data);
#  
@staticmethod
def decode(program: anchorpy.program.core.Program, buf: bytes):
View Source
    @staticmethod
    def decode(program: anchorpy.Program, buf: bytes):
        coder = anchorpy.Coder(program.idl)
        return coder.accounts.decode(buf)
#  
@staticmethod
async def create( program: anchorpy.program.core.Program, params: switchboardpy.JobInitParams ):
View Source
    @staticmethod
    async def create(program: anchorpy.Program, params: JobInitParams):

        job_account = params.keypair or Keypair.generate()
        size = 280 + len(params.data) + (''.join(params.variables) if params.variables else 0)
        state_account, state_bump = ProgramStateAccount.from_seed(program)
        state = await state_account.load_data()
        response = await program.provider.connection.get_minimum_balance_for_rent_exemption(size)
        lamports = response["result"]
        await program.rpc["job_init"](
            {
                "name": params.name or bytes([0] * 32),
                "expiration": params.expiration or 0,
                "data": params.data,
                "variables": [bytes(b'') for _ in params.variables] if params.variables else [],
                "state_bump": state_bump
            },
            ctx=anchorpy.Context(
                accounts={
                    "job": job_account.public_key,
                    "authority": params.authority or state.token_vault,
                    "program_state": state_account.public_key
                },
                signers=[job_account],
                pre_instructions=[
                    create_account(
                        CreateAccountParams(
                            from_pubkey=program.provider.wallet.public_key, 
                            new_account_pubkey=job_account.public_key,
                            lamports=lamports, 
                            space=size, 
                            program_id=program.program_id
                        )
                    )
                ]
            )
        )
        return JobAccount(AccountParams(program=program, keypair=job_account))
#  
@dataclass
class JobInitParams:
View Source
@dataclass
class JobInitParams:

    """A serialized protocol buffer holding the schema of the job."""
    data: bytes

    """An optional name to apply to the job account."""
    name: bytes = None

    """unix_timestamp of when funds can be withdrawn from this account."""
    expiration: int = None

    """A required variables oracles must fill to complete the job."""
    variables: list[str] = None

    """A pre-generated keypair to use."""
    keypair: Keypair = None

    """
    An optional wallet for receiving kickbacks from job usage in feeds.
    """
    authority: PublicKey = None

A serialized protocol buffer holding the schema of the job.

#   JobInitParams( data: bytes, name: bytes = None, expiration: int = None, variables: list[str] = None, keypair: solana.keypair.Keypair = None, authority: solana.publickey.PublicKey = None )
#   data: bytes

An optional name to apply to the job account.

#   name: bytes = None

unix_timestamp of when funds can be withdrawn from this account.

#   expiration: int = None

A required variables oracles must fill to complete the job.

#   variables: list[str] = None

A pre-generated keypair to use.

#   keypair: solana.keypair.Keypair = None

An optional wallet for receiving kickbacks from job usage in feeds.

#   authority: solana.publickey.PublicKey = None
#   class LeaseAccount:
View Source
class LeaseAccount:
    """ A Switchboard account representing a lease for managing funds for oracle payouts
    for fulfilling feed updates.

    Attributes:
        program (anchor.Program): The anchor program ref
        public_key (PublicKey | None): This lease's public key
        keypair (Keypair | None): this lease's keypair
    """


    def __init__(self, params: AccountParams):
        if params.public_key is None and params.keypair is None:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        if params.keypair and params.public_key and params.keypair.public_key != params.public_key:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        self.program = params.program
        self.public_key = params.keypair.public_key if params.keypair else params.public_key
        self.keypair = params.keypair

    """
    Get the size of an LeaseAccount on chain

    Args:

    Returns:
        int: size of the LeaseAccount type on chain
    """
    def size(self):
        return self.program.account["LeaseAccountData"].size

    """
    Load and parse LeaseAccount data based on the program IDL

    Args:
    
    Returns:
        LeaseAccount

    Raises:
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def load_data(self):
        return await LeaseAccountData.fetch(self.program.provider.connection, self.public_key)


    """
    Loads a LeaseAccount from the expected PDA seed format

    Args:
        program (anchorpy.Program)
        queue_account (OracleQueueAccount)
        aggregator_account (AggregatorAccount)

    Returns:
        Tuple[LeaseAccount, int]: LeaseAccount and PDA bump
    """
    @staticmethod
    def from_seed(program: anchorpy.Program, queue_account: OracleQueueAccount, aggregator_account: AggregatorAccount):
        pubkey, bump = publickey.PublicKey.find_program_address(
            [
                bytes(b'LeaseAccountData'), 
                bytes(queue_account.public_key),
                bytes(aggregator_account.public_key),
            ],
            program.program_id
        )
    
        return LeaseAccount(AccountParams(program=program, public_key=pubkey)), bump

    """
    Create and initialize the LeaseAccount.

    Args:
        program (anchor.Program): Switchboard program representation holding connection and IDL.
        params (LeaseInitParams)
    
    Returns:
        LeaseAccount
    """
    @staticmethod
    async def create(program: anchorpy.Program, params: LeaseInitParams):
        program_state_account, state_bump = ProgramStateAccount.from_seed(program)
        switch_token_mint = await program_state_account.get_token_mint()
        lease_account, lease_bump = LeaseAccount.from_seed(
            program,
            params.oracle_queue_account,
            params.aggregator_account
        )


        job_account_data = await params.aggregator_account.load_jobs()
        aggregator_account_data = await params.aggregator_account.load_data()
        job_pubkeys: list[PublicKey] = aggregator_account_data.job_pubkeys_data[:aggregator_account_data.job_pubkeys_size]
        job_wallets: list[PublicKey] = []
        wallet_bumps: list[int] = []
        for job in job_account_data:
            authority = job.account.authority or PublicKey('11111111111111111111111111111111')
            pubkey, bump = publickey.PublicKey.find_program_address(
                [
                    bytes(authority), 
                    bytes(TOKEN_PROGRAM_ID),
                    bytes(switch_token_mint.pubkey),
                ],
                ASSOCIATED_TOKEN_PROGRAM_ID
            )
            job_wallets.append(pubkey)
            wallet_bumps.append(bump)

        escrow = await switch_token_mint.create_associated_token_account(lease_account.public_key, skip_confirmation=False)
        await program.rpc["lease_init"](
            {
                "load_amount": params.load_amount,
                "state_bump": state_bump,
                "lease_bump": lease_bump,
                "withdraw_authority": params.withdraw_authority or PublicKey('11111111111111111111111111111111'),
                "wallet_bumps": bytes(wallet_bumps)
            },
            ctx=anchorpy.Context(
                accounts={
                    "program_state": program_state_account.public_key,
                    "lease": lease_account.public_key,
                    "queue": params.oracle_queue_account.public_key,
                    "aggregator": params.aggregator_account.public_key,
                    "system_program": system_program.SYS_PROGRAM_ID,
                    "funder": params.funder,
                    "payer": program.provider.wallet.public_key,
                    "token_program": TOKEN_PROGRAM_ID,
                    "escrow": escrow,
                    "owner": params.funder_authority.public_key,
                    "mint": switch_token_mint.pubkey
                },
                signers=[params.funder_authority],
                remaining_accounts=[AccountMeta(is_signer=False, is_writable=True, pubkey=x) for x in [*job_pubkeys, *job_wallets]]
            )
        )
        return LeaseAccount(AccountParams(program=program, public_key=lease_account.public_key))

    """
    Get lease balance

    Args:
    Returns:
        int balance
    """
    async def get_balance(self):
        lease = self.load_data()
        return await self.program.provider.connection.get_balance(lease.escrow)

    """
    Adds fund to a LeaseAccount. Note that funds can always be withdrawn by
    the withdraw authority if one was set on lease initialization.

    Args:
        program (anchor.Program): Switchboard program representation holding connection and IDL.
        params (LeaseExtendParams)
    
    Returns:
        TransactionSignature
    """
    async def extend(self, params: LeaseExtendParams):
        program = self.program
        lease = await self.load_data()
        escrow = lease.escrow
        queue = lease.queue
        aggregator = lease.aggregator
        program_state_account, state_bump = ProgramStateAccount.from_seed(program)
        queue_account = OracleQueueAccount(AccountParams(program=program, public_key=queue))
        switch_token_mint = await queue_account.load_mint()
        lease_account, lease_bump = LeaseAccount.from_seed(
            program,
            OracleQueueAccount(AccountParams(program=program, public_key=queue)),
            AggregatorAccount(AccountParams(program=program, public_key=aggregator))
        )
        job_account_data = await aggregator.load_jobs()
        aggregator_account_data = await aggregator.load_data()
        job_pubkeys: list[PublicKey] = aggregator_account_data.job_pubkeys_data[:aggregator_account_data.job_pubkeys_size]
        job_wallets: list[PublicKey] = []
        wallet_bumps: list[int] = []
        for job in job_account_data:
            authority = job.account.authority or PublicKey('11111111111111111111111111111111')
            pubkey, bump = publickey.PublicKey.find_program_address(
                [
                    bytes(authority), 
                    bytes(TOKEN_PROGRAM_ID),
                    bytes(switch_token_mint.pubkey),
                ],
                ASSOCIATED_TOKEN_PROGRAM_ID
            )
            job_wallets.append(pubkey)
            wallet_bumps.append(bump)

        return await program.rpc["lease_extend"](
            {
                "load_amount": params.load_amount,
                "state_bump": state_bump,
                "lease_bump": lease_bump,
                "wallet_bumps": bytes(wallet_bumps)
            },
            ctx=anchorpy.Context(
                accounts={
                    "lease": lease_account.public_key,
                    "aggregator": aggregator,
                    "queue": queue,
                    "funder": params.funder,
                    "owner": params.funder_authority.public_key,
                    "token_program": TOKEN_PROGRAM_ID,
                    "escrow": escrow,
                    "program_state": program_state_account.public_key,
                    "mint": switch_token_mint.pubkey
                },
                signers=[params.funder_authority],
                remaining_accounts=[AccountMeta(is_signer=False, is_writable=True, pubkey=x) for x in [*job_pubkeys, *job_wallets]]
            )
        )
 
    """
    Withdraw stake and/or rewards from a LeaseAccount.

    Args:
        params (LeaseWithdrawParams)
    
    Returns:
        TransactionSignature

    Raises:
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def withdraw(self, params: LeaseWithdrawParams):
        program = self.program
        lease = await self.load_data()
        escrow = lease.escrow
        queue = lease.queue
        aggregator = lease.aggregator
        program_state_account, state_bump = ProgramStateAccount.from_seed(program)
        queue_account = OracleQueueAccount(AccountParams(program=program, public_key=queue))
        switch_token_mint = await queue_account.load_mint()
        lease_account, lease_bump = LeaseAccount.from_seed(
            program,
            OracleQueueAccount(AccountParams(program=program, public_key=queue)),
            AggregatorAccount(AccountParams(program=program, public_key=aggregator))
        )
        return await self.program.rpc["lease_withdraw"](
            {
                "amount": params.amount,
                "state_bump": state_bump,
                "lease_bump": lease_bump
            },
            ctx=anchorpy.Context(
                accounts={
                    "lease": lease_account.public_key,
                    "escrow": escrow,
                    "aggregator": aggregator,
                    "queue": queue,
                    "withdraw_authority": params.withdraw_authority.public_key,
                    "withdraw_account": params.withdraw_wallet,
                    "token_program": TOKEN_PROGRAM_ID,
                    "program_state": program_state_account.public_key,
                    "mint": switch_token_mint.pubkey
                },
                signers=[params.withdraw_authority]
            )
        )

A Switchboard account representing a lease for managing funds for oracle payouts for fulfilling feed updates.

Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This lease's public key keypair (Keypair | None): this lease's keypair

#   LeaseAccount(params: switchboardpy.AccountParams)
View Source
    def __init__(self, params: AccountParams):
        if params.public_key is None and params.keypair is None:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        if params.keypair and params.public_key and params.keypair.public_key != params.public_key:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        self.program = params.program
        self.public_key = params.keypair.public_key if params.keypair else params.public_key
        self.keypair = params.keypair
#   keypair

Get the size of an LeaseAccount on chain

Args:

Returns: int: size of the LeaseAccount type on chain

#   def size(self):
View Source
    def size(self):
        return self.program.account["LeaseAccountData"].size
#   async def load_data(self):
View Source
    async def load_data(self):
        return await LeaseAccountData.fetch(self.program.provider.connection, self.public_key)
#  
@staticmethod
def from_seed( program: anchorpy.program.core.Program, queue_account: switchboardpy.OracleQueueAccount, aggregator_account: switchboardpy.AggregatorAccount ):
View Source
    @staticmethod
    def from_seed(program: anchorpy.Program, queue_account: OracleQueueAccount, aggregator_account: AggregatorAccount):
        pubkey, bump = publickey.PublicKey.find_program_address(
            [
                bytes(b'LeaseAccountData'), 
                bytes(queue_account.public_key),
                bytes(aggregator_account.public_key),
            ],
            program.program_id
        )
    
        return LeaseAccount(AccountParams(program=program, public_key=pubkey)), bump
#  
@staticmethod
async def create( program: anchorpy.program.core.Program, params: switchboardpy.LeaseInitParams ):
View Source
    @staticmethod
    async def create(program: anchorpy.Program, params: LeaseInitParams):
        program_state_account, state_bump = ProgramStateAccount.from_seed(program)
        switch_token_mint = await program_state_account.get_token_mint()
        lease_account, lease_bump = LeaseAccount.from_seed(
            program,
            params.oracle_queue_account,
            params.aggregator_account
        )


        job_account_data = await params.aggregator_account.load_jobs()
        aggregator_account_data = await params.aggregator_account.load_data()
        job_pubkeys: list[PublicKey] = aggregator_account_data.job_pubkeys_data[:aggregator_account_data.job_pubkeys_size]
        job_wallets: list[PublicKey] = []
        wallet_bumps: list[int] = []
        for job in job_account_data:
            authority = job.account.authority or PublicKey('11111111111111111111111111111111')
            pubkey, bump = publickey.PublicKey.find_program_address(
                [
                    bytes(authority), 
                    bytes(TOKEN_PROGRAM_ID),
                    bytes(switch_token_mint.pubkey),
                ],
                ASSOCIATED_TOKEN_PROGRAM_ID
            )
            job_wallets.append(pubkey)
            wallet_bumps.append(bump)

        escrow = await switch_token_mint.create_associated_token_account(lease_account.public_key, skip_confirmation=False)
        await program.rpc["lease_init"](
            {
                "load_amount": params.load_amount,
                "state_bump": state_bump,
                "lease_bump": lease_bump,
                "withdraw_authority": params.withdraw_authority or PublicKey('11111111111111111111111111111111'),
                "wallet_bumps": bytes(wallet_bumps)
            },
            ctx=anchorpy.Context(
                accounts={
                    "program_state": program_state_account.public_key,
                    "lease": lease_account.public_key,
                    "queue": params.oracle_queue_account.public_key,
                    "aggregator": params.aggregator_account.public_key,
                    "system_program": system_program.SYS_PROGRAM_ID,
                    "funder": params.funder,
                    "payer": program.provider.wallet.public_key,
                    "token_program": TOKEN_PROGRAM_ID,
                    "escrow": escrow,
                    "owner": params.funder_authority.public_key,
                    "mint": switch_token_mint.pubkey
                },
                signers=[params.funder_authority],
                remaining_accounts=[AccountMeta(is_signer=False, is_writable=True, pubkey=x) for x in [*job_pubkeys, *job_wallets]]
            )
        )
        return LeaseAccount(AccountParams(program=program, public_key=lease_account.public_key))
#   async def get_balance(self):
View Source
    async def get_balance(self):
        lease = self.load_data()
        return await self.program.provider.connection.get_balance(lease.escrow)
#   async def extend(self, params: switchboardpy.LeaseExtendParams):
View Source
    async def extend(self, params: LeaseExtendParams):
        program = self.program
        lease = await self.load_data()
        escrow = lease.escrow
        queue = lease.queue
        aggregator = lease.aggregator
        program_state_account, state_bump = ProgramStateAccount.from_seed(program)
        queue_account = OracleQueueAccount(AccountParams(program=program, public_key=queue))
        switch_token_mint = await queue_account.load_mint()
        lease_account, lease_bump = LeaseAccount.from_seed(
            program,
            OracleQueueAccount(AccountParams(program=program, public_key=queue)),
            AggregatorAccount(AccountParams(program=program, public_key=aggregator))
        )
        job_account_data = await aggregator.load_jobs()
        aggregator_account_data = await aggregator.load_data()
        job_pubkeys: list[PublicKey] = aggregator_account_data.job_pubkeys_data[:aggregator_account_data.job_pubkeys_size]
        job_wallets: list[PublicKey] = []
        wallet_bumps: list[int] = []
        for job in job_account_data:
            authority = job.account.authority or PublicKey('11111111111111111111111111111111')
            pubkey, bump = publickey.PublicKey.find_program_address(
                [
                    bytes(authority), 
                    bytes(TOKEN_PROGRAM_ID),
                    bytes(switch_token_mint.pubkey),
                ],
                ASSOCIATED_TOKEN_PROGRAM_ID
            )
            job_wallets.append(pubkey)
            wallet_bumps.append(bump)

        return await program.rpc["lease_extend"](
            {
                "load_amount": params.load_amount,
                "state_bump": state_bump,
                "lease_bump": lease_bump,
                "wallet_bumps": bytes(wallet_bumps)
            },
            ctx=anchorpy.Context(
                accounts={
                    "lease": lease_account.public_key,
                    "aggregator": aggregator,
                    "queue": queue,
                    "funder": params.funder,
                    "owner": params.funder_authority.public_key,
                    "token_program": TOKEN_PROGRAM_ID,
                    "escrow": escrow,
                    "program_state": program_state_account.public_key,
                    "mint": switch_token_mint.pubkey
                },
                signers=[params.funder_authority],
                remaining_accounts=[AccountMeta(is_signer=False, is_writable=True, pubkey=x) for x in [*job_pubkeys, *job_wallets]]
            )
        )
#   async def withdraw(self, params: switchboardpy.LeaseWithdrawParams):
View Source
    async def withdraw(self, params: LeaseWithdrawParams):
        program = self.program
        lease = await self.load_data()
        escrow = lease.escrow
        queue = lease.queue
        aggregator = lease.aggregator
        program_state_account, state_bump = ProgramStateAccount.from_seed(program)
        queue_account = OracleQueueAccount(AccountParams(program=program, public_key=queue))
        switch_token_mint = await queue_account.load_mint()
        lease_account, lease_bump = LeaseAccount.from_seed(
            program,
            OracleQueueAccount(AccountParams(program=program, public_key=queue)),
            AggregatorAccount(AccountParams(program=program, public_key=aggregator))
        )
        return await self.program.rpc["lease_withdraw"](
            {
                "amount": params.amount,
                "state_bump": state_bump,
                "lease_bump": lease_bump
            },
            ctx=anchorpy.Context(
                accounts={
                    "lease": lease_account.public_key,
                    "escrow": escrow,
                    "aggregator": aggregator,
                    "queue": queue,
                    "withdraw_authority": params.withdraw_authority.public_key,
                    "withdraw_account": params.withdraw_wallet,
                    "token_program": TOKEN_PROGRAM_ID,
                    "program_state": program_state_account.public_key,
                    "mint": switch_token_mint.pubkey
                },
                signers=[params.withdraw_authority]
            )
        )
#  
@dataclass
class LeaseExtendParams:
View Source
@dataclass
class LeaseExtendParams:

    """Token amount to load into the lease escrow"""
    load_amount: int

    """The funding wallet of the lease"""
    funder: PublicKey

    """The authority of the funding wallet"""
    funder_authority: Keypair

Token amount to load into the lease escrow

#   LeaseExtendParams( load_amount: int, funder: solana.publickey.PublicKey, funder_authority: solana.keypair.Keypair )
#   load_amount: int

The funding wallet of the lease

#   funder: solana.publickey.PublicKey

The authority of the funding wallet

#  
@dataclass
class LeaseInitParams:
View Source
@dataclass
class LeaseInitParams:

    """Token amount to load into the lease escrow"""
    load_amount: int

    """The funding wallet of the lease"""
    funder: PublicKey

    """The authority of the funding wallet"""
    funder_authority: Keypair

    """The target to which this lease is applied"""
    oracle_queue_account: OracleQueueAccount

    """The feed which the lease grants permission"""
    aggregator_account: AggregatorAccount

    """This authority will be permitted to withdraw funds from this lease"""
    withdraw_authority: PublicKey = None

Token amount to load into the lease escrow

#   LeaseInitParams( load_amount: int, funder: solana.publickey.PublicKey, funder_authority: solana.keypair.Keypair, oracle_queue_account: switchboardpy.OracleQueueAccount, aggregator_account: switchboardpy.AggregatorAccount, withdraw_authority: solana.publickey.PublicKey = None )
#   load_amount: int

The funding wallet of the lease

#   funder: solana.publickey.PublicKey

The authority of the funding wallet

#   funder_authority: solana.keypair.Keypair

The target to which this lease is applied

The feed which the lease grants permission

This authority will be permitted to withdraw funds from this lease

#   withdraw_authority: solana.publickey.PublicKey = None
#  
@dataclass
class LeaseWithdrawParams:
View Source
@dataclass
class LeaseWithdrawParams:

    """Token amount to withdraw from the lease escrow"""
    amount: int

    """The wallet of to withdraw to"""
    withdraw_wallet: PublicKey

    """The withdraw authority of the lease"""
    withdraw_authority: Keypair

Token amount to withdraw from the lease escrow

#   LeaseWithdrawParams( amount: int, withdraw_wallet: solana.publickey.PublicKey, withdraw_authority: solana.keypair.Keypair )
#   amount: int

The wallet of to withdraw to

#   withdraw_wallet: solana.publickey.PublicKey

The withdraw authority of the lease

#   class OracleAccount:
View Source
class OracleAccount:
    """ A Switchboard account representing an oracle account and its associated queue
    and escrow account.

    Attributes:
        program (anchor.Program): The anchor program ref
        public_key (PublicKey | None): This aggregator's public key
        keypair (Keypair | None): this aggregator's keypair
    """


    def __init__(self, params: AccountParams):
        if params.public_key is None and params.keypair is None:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        if params.keypair and params.public_key and params.keypair.public_key != params.public_key:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        self.program = params.program
        self.public_key = params.keypair.public_key if params.keypair else params.public_key
        self.keypair = params.keypair
    
    """
    Get the size of an OracleAccount on chain

    Args:

    Returns:
        int: size of the OracleAccount type on chain
    """
    def size(self):
        return self.program.account["OracleAccountData"].size

    """
    Load and parse OracleAccount data based on the program IDL

    Args:
    
    Returns:
        OracleAccount

    Raises:
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def load_data(self):
        return await OracleAccountData.fetch(self.program.provider.connection, self.public_key)


    """
    Loads a OracleAccount from the expected PDA seed format

    Args:
        program (anchorpy.Program)
        queue_account (OracleQueueAccount)
        wallet (PublicKey)

    Returns:
        Tuple[OracleAccount, int]: OracleAccount and PDA bump
    """
    @staticmethod
    def from_seed(program: anchorpy.Program, queue_account: OracleQueueAccount, wallet: PublicKey):
        oracle_pubkey, bump = PublicKey.find_program_address(
            [
                bytes(b'OracleAccountData'), 
                bytes(queue_account.public_key),
                bytes(wallet),
            ],
            program.program_id
        )
    
        return OracleAccount(AccountParams(program=program, public_key=oracle_pubkey)), bump

    """
    Create and initialize the OracleAccount.

    Args:
        program (anchor.Program): Switchboard program representation holding connection and IDL.
        params (OracleInitParams)
    
    Returns:
        OracleAccount

    """
    @staticmethod
    async def create(program: anchorpy.Program, params: OracleInitParams):
        payer_keypair = Keypair.from_secret_key(program.provider.wallet.payer.secret_key)
        program_state_account, state_bump = ProgramStateAccount.from_seed(program)
        switch_token_mint = await program_state_account.get_token_mint()
        wallet = await switch_token_mint.create_account(program.provider.wallet.public_key)
        await switch_token_mint.set_authority(
            wallet,
            program_state_account.public_key,
            'AccountOwner',
            payer_keypair,
            []
        )
        oracle_account, oracle_bump = OracleAccount.from_seed(
            program,
            params.queue_account,
            wallet
        )

        await program.rpc["oracle_init"](
            {
                "name": params.name or bytes([0] * 32),
                "metadata": params.metadata or bytes([0] * 128),
                "state_bump": state_bump,
                "oracle_bump": oracle_bump,
            },
            ctx=anchorpy.Context(
                accounts={
                    "oracle": oracle_account.public_key,
                    "oracle_authority": payer_keypair.public_key,
                    "queue": params.queue_account.public_key,
                    "wallet": wallet,
                    "program_state": program_state_account.public_key,
                    "system_program": system_program.SYS_PROGRAM_ID,
                    "payer": program.provider.wallet.public_key
                }
            )
        )
        return OracleAccount(AccountParams(program=program, public_key=oracle_account.public_key))

    """
    Inititates a heartbeat for an OracleAccount, signifying oracle is still healthy.

    Args:
    
    Returns:
        TransactionSignature

    Raises:
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def heartbeat(self):
        payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key)
        oracle = await self.load_data()
        queue_account = OracleQueueAccount(AccountParams(program=self.program,public_key=oracle.queue_pubkey))
        queue_data = await queue_account.load_data()
        last_pubkey = self.public_key
        if queue_data.size != 0:
            last_pubkey = queue_data.queue[queue_data.gc_idx]
        permission_account, permission_bump = PermissionAccount.from_seed(
            self.program,
            queue_data.authority,
            queue_account.public_key,
            self.public_key
        )
        try:
            await permission_account.load_data()
        except Exception:
            raise ValueError('A requested permission pda account has not been initialized.')

        return await self.program.rpc["oracle_heartbeat"](
            {
                "permission_bump": permission_bump
            },
            ctx=anchorpy.Context(
                accounts={
                    "oracle": self.public_key,
                    "oracle_authority": payer_keypair.public_key,
                    "token_account": oracle.token_account,
                    "gc_oracle": last_pubkey,
                    "oracle_queue": queue_account.public_key,
                    "permission": permission_account.public_key,
                    "data_buffer": queue_data.data_buffer
                },
                signers=[self.keypair]
            )
        )


    """
    Withdraw stake and/or rewards from an OracleAccount.

    Args:
        params (OracleWithdrawParams)
    
    Returns:
        TransactionSignature

    Raises:
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def withdraw(self, params: OracleWithdrawParams):
        payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key)
        oracle = await self.load_data()
        queue_pubkey = oracle.queue_pubkey
        queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=queue_pubkey))
        queue = await queue_account.load_data()
        queue_authority = queue.authority
        state_account, state_bump = ProgramStateAccount.from_seed(self.program)
        permission_account, permission_bump = PermissionAccount.from_seed(
            self.program,
            queue_authority,
            queue_account.public_key,
            self.public_key
        )
        return await self.program.rpc["oracle_withdraw"](
            {
                "permission_bump": permission_bump,
                "state_bump": state_bump,
                "amount": params.amount
            },
            ctx=anchorpy.Context(
                accounts={
                    "oracle": self.public_key,
                    "oracle_authority": params.oracle_authority.public_key,
                    "token_account": oracle.token_account,
                    "withdraw_account": params.withdraw_account,
                    "oracle_queue": queue_account.public_key,
                    "permission": permission_account.public_key,
                    "token_program": TOKEN_PROGRAM_ID,
                    "program_state": state_account.public_key,
                    "system_program": system_program.SYS_PROGRAM_ID,
                    "payer": self.program.provider.wallet.public_key
                },
                signers=[params.oracle_authority]
            )
        )

A Switchboard account representing an oracle account and its associated queue and escrow account.

Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This aggregator's public key keypair (Keypair | None): this aggregator's keypair

#   OracleAccount(params: switchboardpy.AccountParams)
View Source
    def __init__(self, params: AccountParams):
        if params.public_key is None and params.keypair is None:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        if params.keypair and params.public_key and params.keypair.public_key != params.public_key:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        self.program = params.program
        self.public_key = params.keypair.public_key if params.keypair else params.public_key
        self.keypair = params.keypair
#   keypair

Get the size of an OracleAccount on chain

Args:

Returns: int: size of the OracleAccount type on chain

#   def size(self):
View Source
    def size(self):
        return self.program.account["OracleAccountData"].size
#   async def load_data(self):
View Source
    async def load_data(self):
        return await OracleAccountData.fetch(self.program.provider.connection, self.public_key)
#  
@staticmethod
def from_seed( program: anchorpy.program.core.Program, queue_account: switchboardpy.OracleQueueAccount, wallet: solana.publickey.PublicKey ):
View Source
    @staticmethod
    def from_seed(program: anchorpy.Program, queue_account: OracleQueueAccount, wallet: PublicKey):
        oracle_pubkey, bump = PublicKey.find_program_address(
            [
                bytes(b'OracleAccountData'), 
                bytes(queue_account.public_key),
                bytes(wallet),
            ],
            program.program_id
        )
    
        return OracleAccount(AccountParams(program=program, public_key=oracle_pubkey)), bump
#  
@staticmethod
async def create( program: anchorpy.program.core.Program, params: switchboardpy.OracleInitParams ):
View Source
    @staticmethod
    async def create(program: anchorpy.Program, params: OracleInitParams):
        payer_keypair = Keypair.from_secret_key(program.provider.wallet.payer.secret_key)
        program_state_account, state_bump = ProgramStateAccount.from_seed(program)
        switch_token_mint = await program_state_account.get_token_mint()
        wallet = await switch_token_mint.create_account(program.provider.wallet.public_key)
        await switch_token_mint.set_authority(
            wallet,
            program_state_account.public_key,
            'AccountOwner',
            payer_keypair,
            []
        )
        oracle_account, oracle_bump = OracleAccount.from_seed(
            program,
            params.queue_account,
            wallet
        )

        await program.rpc["oracle_init"](
            {
                "name": params.name or bytes([0] * 32),
                "metadata": params.metadata or bytes([0] * 128),
                "state_bump": state_bump,
                "oracle_bump": oracle_bump,
            },
            ctx=anchorpy.Context(
                accounts={
                    "oracle": oracle_account.public_key,
                    "oracle_authority": payer_keypair.public_key,
                    "queue": params.queue_account.public_key,
                    "wallet": wallet,
                    "program_state": program_state_account.public_key,
                    "system_program": system_program.SYS_PROGRAM_ID,
                    "payer": program.provider.wallet.public_key
                }
            )
        )
        return OracleAccount(AccountParams(program=program, public_key=oracle_account.public_key))
#   async def heartbeat(self):
View Source
    async def heartbeat(self):
        payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key)
        oracle = await self.load_data()
        queue_account = OracleQueueAccount(AccountParams(program=self.program,public_key=oracle.queue_pubkey))
        queue_data = await queue_account.load_data()
        last_pubkey = self.public_key
        if queue_data.size != 0:
            last_pubkey = queue_data.queue[queue_data.gc_idx]
        permission_account, permission_bump = PermissionAccount.from_seed(
            self.program,
            queue_data.authority,
            queue_account.public_key,
            self.public_key
        )
        try:
            await permission_account.load_data()
        except Exception:
            raise ValueError('A requested permission pda account has not been initialized.')

        return await self.program.rpc["oracle_heartbeat"](
            {
                "permission_bump": permission_bump
            },
            ctx=anchorpy.Context(
                accounts={
                    "oracle": self.public_key,
                    "oracle_authority": payer_keypair.public_key,
                    "token_account": oracle.token_account,
                    "gc_oracle": last_pubkey,
                    "oracle_queue": queue_account.public_key,
                    "permission": permission_account.public_key,
                    "data_buffer": queue_data.data_buffer
                },
                signers=[self.keypair]
            )
        )
#   async def withdraw(self, params: switchboardpy.OracleWithdrawParams):
View Source
    async def withdraw(self, params: OracleWithdrawParams):
        payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key)
        oracle = await self.load_data()
        queue_pubkey = oracle.queue_pubkey
        queue_account = OracleQueueAccount(AccountParams(program=self.program, public_key=queue_pubkey))
        queue = await queue_account.load_data()
        queue_authority = queue.authority
        state_account, state_bump = ProgramStateAccount.from_seed(self.program)
        permission_account, permission_bump = PermissionAccount.from_seed(
            self.program,
            queue_authority,
            queue_account.public_key,
            self.public_key
        )
        return await self.program.rpc["oracle_withdraw"](
            {
                "permission_bump": permission_bump,
                "state_bump": state_bump,
                "amount": params.amount
            },
            ctx=anchorpy.Context(
                accounts={
                    "oracle": self.public_key,
                    "oracle_authority": params.oracle_authority.public_key,
                    "token_account": oracle.token_account,
                    "withdraw_account": params.withdraw_account,
                    "oracle_queue": queue_account.public_key,
                    "permission": permission_account.public_key,
                    "token_program": TOKEN_PROGRAM_ID,
                    "program_state": state_account.public_key,
                    "system_program": system_program.SYS_PROGRAM_ID,
                    "payer": self.program.provider.wallet.public_key
                },
                signers=[params.oracle_authority]
            )
        )
#  
@dataclass
class OracleInitParams:
View Source
@dataclass
class OracleInitParams:
    
    """Specifies the oracle queue to associate with this OracleAccount."""
    queue_account: OracleQueueAccount

    """Buffer specifying orace name"""
    name: bytes = None

    """Buffer specifying oralce metadata"""
    metadata: bytes = None

Specifies the oracle queue to associate with this OracleAccount.

#   OracleInitParams( queue_account: switchboardpy.OracleQueueAccount, name: bytes = None, metadata: bytes = None )

Buffer specifying orace name

#   name: bytes = None

Buffer specifying oralce metadata

#   metadata: bytes = None
#  
@dataclass
class OracleWithdrawParams:
View Source
@dataclass
class OracleWithdrawParams:
    
    """Amount to withdraw"""
    amount: Decimal

    """Token Account to withdraw to"""
    withdraw_account: PublicKey

    """Oracle authority keypair"""
    oracle_authority: Keypair

Amount to withdraw

#   OracleWithdrawParams( amount: decimal.Decimal, withdraw_account: solana.publickey.PublicKey, oracle_authority: solana.keypair.Keypair )
#   amount: decimal.Decimal

Token Account to withdraw to

#   withdraw_account: solana.publickey.PublicKey

Oracle authority keypair

#   class OracleQueueAccount:
View Source
class OracleQueueAccount:
    """A Switchboard account representing a queue for distributing oracles to
    permitted data feeds.

    Attributes:
        program (anchor.Program): The anchor program ref
        public_key (PublicKey | None): This OracleQueueAccount's public key
        keypair (Keypair | None): this OracleQueueAccount's keypair
    """

    def __init__(self, params: AccountParams):
        if params.public_key is None and params.keypair is None:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        if params.keypair and params.public_key and params.keypair.public_key != params.public_key:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        self.program = params.program
        self.public_key = params.keypair.public_key if params.keypair else params.public_key
        self.keypair = params.keypair

    """
    Get the size of an OracleQueueAccount on chain

    Args:

    Returns:
        int: size of the OracleQueueAccount type on chain
    """
    def size(self):
        return self.program.account["OracleQueueAccountData"].size

    """
    Load and parse OracleQueueAccount data based on the program IDL

    Args:
    
    Returns:
        OracleQueueAccount

    Raises:
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def load_data(self):
        return await OracleQueueAccountData.fetch(self.program.provider.connection, self.public_key)

    """
    Fetch the token mint for this queue
    Args:
    Returns:
        AsyncToken
    """
    async def load_mint(self) -> AsyncToken:
        payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key)
        queue = await self.load_data()
        try:
            mint = AsyncToken(self.program.provider.connection, queue.mint, TOKEN_PROGRAM_ID, payer_keypair)
            return mint;
        except AttributeError:
            return AsyncToken(self.program.provider.connection, WRAPPED_SOL_MINT, TOKEN_PROGRAM_ID, payer_keypair)

    """
    Create and initialize the OracleQueueAccount

    Args:
        program (anchor.Program)
        params (OracleQueueInitParams)

    Returns:
        OracleQueueAccount
    """
    @staticmethod
    async def create(program: anchorpy.Program, params: OracleQueueInitParams):
        oracle_queue_account = Keypair.generate()
        buffer = Keypair.generate()
        queue_size = params.queue_size or 500
        queue_size = queue_size * 32 + 8
        response = await program.provider.connection.get_minimum_balance_for_rent_exemption(queue_size)
        lamports = response["result"]
        await program.rpc["oracle_queue_init"](
            {
                "name": params.name or bytes([0] * 32),
                "metadata": params.metadata or bytes([0] * 64),
                "reward": params.reward or 0,
                "min_stake": params.min_stake or 0,
                "feed_probation_period": params.feed_probation_period or 0,
                "oracle_timeout": params.oracle_timeout or 180,
                "slashing_enabled": params.slashing_enabled or False,
                "variance_tolerance_multiplier": SwitchboardDecimal.from_decimal(params.variance_tolerance_multiplier or Decimal(2)).as_proper_sbd(program),
                "authority": params.authority,
                "consecutive_feed_failure_limit": params.consecutive_feed_failure_limit or 1000,
                "consecutive_oracle_failure_limit": params.consecutive_oracle_failure_limit or 1000,
                "minimum_delay_seconds": params.minimum_delay_seconds or 5,
                "queue_size": params.queue_size or 0,
                "unpermissioned_feeds": params.unpermissioned_feeds or False,
                "unpermissioned_vrf": params.unpermissioned_feeds or False,
                "enable_buffer_relayers": False
            },
            ctx=anchorpy.Context(
                accounts={
                    "oracle_queue": oracle_queue_account.public_key,
                    "authority": params.authority,
                    "buffer": buffer.public_key,
                    "system_program": system_program.SYS_PROGRAM_ID,
                    "payer": program.provider.wallet.public_key,
                    "mint": params.mint
                },
                signers=[oracle_queue_account, buffer],
                pre_instructions=[
                    create_account(
                        CreateAccountParams(
                            from_pubkey=program.provider.wallet.public_key, 
                            new_account_pubkey=buffer.public_key,
                            lamports=lamports, 
                            space=queue_size, 
                            program_id=program.program_id
                        )
                    )
                ]
            )
        )
        return OracleQueueAccount(AccountParams(program=program, keypair=oracle_queue_account));

A Switchboard account representing a queue for distributing oracles to permitted data feeds.

Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This OracleQueueAccount's public key keypair (Keypair | None): this OracleQueueAccount's keypair

#   OracleQueueAccount(params: switchboardpy.AccountParams)
View Source
    def __init__(self, params: AccountParams):
        if params.public_key is None and params.keypair is None:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        if params.keypair and params.public_key and params.keypair.public_key != params.public_key:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        self.program = params.program
        self.public_key = params.keypair.public_key if params.keypair else params.public_key
        self.keypair = params.keypair
#   keypair

Get the size of an OracleQueueAccount on chain

Args:

Returns: int: size of the OracleQueueAccount type on chain

#   def size(self):
View Source
    def size(self):
        return self.program.account["OracleQueueAccountData"].size
#   async def load_data(self):
View Source
    async def load_data(self):
        return await OracleQueueAccountData.fetch(self.program.provider.connection, self.public_key)
#   async def load_mint(self) -> spl.token.async_client.AsyncToken:
View Source
    async def load_mint(self) -> AsyncToken:
        payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key)
        queue = await self.load_data()
        try:
            mint = AsyncToken(self.program.provider.connection, queue.mint, TOKEN_PROGRAM_ID, payer_keypair)
            return mint;
        except AttributeError:
            return AsyncToken(self.program.provider.connection, WRAPPED_SOL_MINT, TOKEN_PROGRAM_ID, payer_keypair)
#  
@staticmethod
async def create( program: anchorpy.program.core.Program, params: switchboardpy.OracleQueueInitParams ):
View Source
    @staticmethod
    async def create(program: anchorpy.Program, params: OracleQueueInitParams):
        oracle_queue_account = Keypair.generate()
        buffer = Keypair.generate()
        queue_size = params.queue_size or 500
        queue_size = queue_size * 32 + 8
        response = await program.provider.connection.get_minimum_balance_for_rent_exemption(queue_size)
        lamports = response["result"]
        await program.rpc["oracle_queue_init"](
            {
                "name": params.name or bytes([0] * 32),
                "metadata": params.metadata or bytes([0] * 64),
                "reward": params.reward or 0,
                "min_stake": params.min_stake or 0,
                "feed_probation_period": params.feed_probation_period or 0,
                "oracle_timeout": params.oracle_timeout or 180,
                "slashing_enabled": params.slashing_enabled or False,
                "variance_tolerance_multiplier": SwitchboardDecimal.from_decimal(params.variance_tolerance_multiplier or Decimal(2)).as_proper_sbd(program),
                "authority": params.authority,
                "consecutive_feed_failure_limit": params.consecutive_feed_failure_limit or 1000,
                "consecutive_oracle_failure_limit": params.consecutive_oracle_failure_limit or 1000,
                "minimum_delay_seconds": params.minimum_delay_seconds or 5,
                "queue_size": params.queue_size or 0,
                "unpermissioned_feeds": params.unpermissioned_feeds or False,
                "unpermissioned_vrf": params.unpermissioned_feeds or False,
                "enable_buffer_relayers": False
            },
            ctx=anchorpy.Context(
                accounts={
                    "oracle_queue": oracle_queue_account.public_key,
                    "authority": params.authority,
                    "buffer": buffer.public_key,
                    "system_program": system_program.SYS_PROGRAM_ID,
                    "payer": program.provider.wallet.public_key,
                    "mint": params.mint
                },
                signers=[oracle_queue_account, buffer],
                pre_instructions=[
                    create_account(
                        CreateAccountParams(
                            from_pubkey=program.provider.wallet.public_key, 
                            new_account_pubkey=buffer.public_key,
                            lamports=lamports, 
                            space=queue_size, 
                            program_id=program.program_id
                        )
                    )
                ]
            )
        )
        return OracleQueueAccount(AccountParams(program=program, keypair=oracle_queue_account));
#  
@dataclass
class OracleQueueInitParams:
View Source
@dataclass
class OracleQueueInitParams:

    """Mint for the oracle queue"""
    mint: PublicKey

    """Rewards to provide oracles and round openers on this queue."""
    reward: int

    """The minimum amount of stake oracles must present to remain on the queue."""
    min_stake: int

    """
    The account to delegate authority to for creating permissions targeted
    at the queue.
    """
    authority: PublicKey 

    """Time period we should remove an oracle after if no response."""
    oracle_timeout: int = None

    """
    The tolerated variance amount oracle results can have from the
    accepted round result before being slashed.
    slashBound = varianceToleranceMultiplier * stdDeviation
    Default: 2
    """
    variance_tolerance_multiplier: Decimal = None

    """Consecutive failure limit for a feed before feed permission is revoked."""
    consecutive_feed_failure_limit: int = None

    """
    Consecutive failure limit for an oracle before oracle permission is revoked.
    """
    consecutive_oracle_failure_limit: int = None

    """the minimum update delay time for Aggregators"""
    minimum_delay_seconds: int = None

    """Optionally set the size of the queue."""
    queue_size: int = None

    """
    Enabling this setting means data feeds do not need explicit permission
    to join the queue.
    """
    unpermissioned_feeds: bool = None

    """Whether slashing is enabled on this queue"""
    slashing_enabled: bool = None

    """
    After a feed lease is funded or re-funded, it must consecutively succeed
    N amount of times or its authorization to use the queue is auto-revoked.
    """
    feed_probation_period: int = None

    """A name to assign to this OracleQueue."""
    name: bytes = None

    """Buffer for queue metadata."""
    metadata: bytes = None

    """
    Enabling this setting means data feeds do not need explicit permission
    to request VRF proofs and verifications from this queue.
    """
    unpermissioned_vrf: bool = None

Mint for the oracle queue

#   OracleQueueInitParams( mint: solana.publickey.PublicKey, reward: int, min_stake: int, authority: solana.publickey.PublicKey, oracle_timeout: int = None, variance_tolerance_multiplier: decimal.Decimal = None, consecutive_feed_failure_limit: int = None, consecutive_oracle_failure_limit: int = None, minimum_delay_seconds: int = None, queue_size: int = None, unpermissioned_feeds: bool = None, slashing_enabled: bool = None, feed_probation_period: int = None, name: bytes = None, metadata: bytes = None, unpermissioned_vrf: bool = None )
#   mint: solana.publickey.PublicKey

Rewards to provide oracles and round openers on this queue.

#   reward: int

The minimum amount of stake oracles must present to remain on the queue.

#   min_stake: int

The account to delegate authority to for creating permissions targeted at the queue.

#   authority: solana.publickey.PublicKey

Time period we should remove an oracle after if no response.

#   oracle_timeout: int = None

The tolerated variance amount oracle results can have from the accepted round result before being slashed. slashBound = varianceToleranceMultiplier * stdDeviation Default: 2

#   variance_tolerance_multiplier: decimal.Decimal = None

Consecutive failure limit for a feed before feed permission is revoked.

#   consecutive_feed_failure_limit: int = None

Consecutive failure limit for an oracle before oracle permission is revoked.

#   consecutive_oracle_failure_limit: int = None

the minimum update delay time for Aggregators

#   minimum_delay_seconds: int = None

Optionally set the size of the queue.

#   queue_size: int = None

Enabling this setting means data feeds do not need explicit permission to join the queue.

#   unpermissioned_feeds: bool = None

Whether slashing is enabled on this queue

#   slashing_enabled: bool = None

After a feed lease is funded or re-funded, it must consecutively succeed N amount of times or its authorization to use the queue is auto-revoked.

#   feed_probation_period: int = None

A name to assign to this OracleQueue.

#   name: bytes = None

Buffer for queue metadata.

#   metadata: bytes = None

Enabling this setting means data feeds do not need explicit permission to request VRF proofs and verifications from this queue.

#   unpermissioned_vrf: bool = None
#   class OracleJob(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

#   DESCRIPTOR = <google.protobuf.pyext._message.MessageDescriptor object>
#   tasks
Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
#   class OracleJob.HttpTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.HttpTask
DESCRIPTOR
Header
url
method
headers
body
Method
METHOD_UNKOWN
METHOD_GET
METHOD_POST
#   class OracleJob.JsonParseTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.JsonParseTask
DESCRIPTOR
path
aggregation_method
AggregationMethod
NONE
MIN
MAX
SUM
#   class OracleJob.MedianTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.MedianTask
DESCRIPTOR
tasks
jobs
min_successful_required
#   class OracleJob.MeanTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.MeanTask
DESCRIPTOR
tasks
jobs
#   class OracleJob.MaxTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.MaxTask
DESCRIPTOR
tasks
jobs
#   class OracleJob.ValueTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.ValueTask
DESCRIPTOR
value
aggregator_pubkey
#   class OracleJob.WebsocketTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.WebsocketTask
DESCRIPTOR
url
subscription
max_data_age_seconds
filter
#   class OracleJob.ConditionalTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.ConditionalTask
DESCRIPTOR
attempt
on_failure
#   class OracleJob.DivideTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.DivideTask
DESCRIPTOR
scalar
aggregator_pubkey
job
#   class OracleJob.MultiplyTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.MultiplyTask
DESCRIPTOR
scalar
aggregator_pubkey
job
#   class OracleJob.AddTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.AddTask
DESCRIPTOR
scalar
aggregator_pubkey
job
#   class OracleJob.SubtractTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.SubtractTask
DESCRIPTOR
scalar
aggregator_pubkey
job
#   class OracleJob.LpTokenPriceTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.LpTokenPriceTask
DESCRIPTOR
mercurial_pool_address
saber_pool_address
orca_pool_address
raydium_pool_address
price_feed_addresses
price_feed_jobs
use_fair_price
#   class OracleJob.LpExchangeRateTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.LpExchangeRateTask
DESCRIPTOR
in_token_address
out_token_address
mercurial_pool_address
saber_pool_address
orca_pool_token_mint_address
raydium_pool_address
#   class OracleJob.RegexExtractTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.RegexExtractTask
DESCRIPTOR
pattern
group_number
#   class OracleJob.XStepPriceTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.XStepPriceTask
DESCRIPTOR
step_job
step_aggregator_pubkey
#   class OracleJob.TwapTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.TwapTask
DESCRIPTOR
aggregator_pubkey
period
weight_by_propagation_time
min_samples
ending_unix_timestamp
#   class OracleJob.SerumSwapTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.SerumSwapTask
DESCRIPTOR
serum_pool_address
#   class OracleJob.PowTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.PowTask
DESCRIPTOR
scalar
aggregator_pubkey
#   class OracleJob.LendingRateTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.LendingRateTask
DESCRIPTOR
protocol
asset_mint
field
Field
FIELD_DEPOSIT_RATE
FIELD_BORROW_RATE
#   class OracleJob.MangoPerpMarketTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.MangoPerpMarketTask
DESCRIPTOR
perp_market_address
#   class OracleJob.JupiterSwapTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.JupiterSwapTask
DESCRIPTOR
in_token_address
out_token_address
base_amount
#   class OracleJob.PerpMarketTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.PerpMarketTask
DESCRIPTOR
mango_market_address
drift_market_address
zeta_market_address
zo_market_address
#   class OracleJob.OracleTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.OracleTask
DESCRIPTOR
switchboard_address
pyth_address
pyth_allowed_confidence_interval
#   class OracleJob.AnchorFetchTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.AnchorFetchTask
DESCRIPTOR
program_id
account_address
#   class OracleJob.DefiKingdomsTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.DefiKingdomsTask
DESCRIPTOR
Token
provider
in_token
out_token
#   class OracleJob.TpsTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.TpsTask
DESCRIPTOR
#   class OracleJob.SplStakePoolTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.SplStakePoolTask
DESCRIPTOR
pubkey
#   class OracleJob.SplTokenParseTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.SplTokenParseTask
DESCRIPTOR
token_account_address
mint_address
#   class OracleJob.UniswapExchangeRateTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.UniswapExchangeRateTask
DESCRIPTOR
in_token_address
out_token_address
in_token_amount
slippage
provider
#   class OracleJob.SushiswapExchangeRateTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.SushiswapExchangeRateTask
DESCRIPTOR
in_token_address
out_token_address
in_token_amount
slippage
provider
#   class OracleJob.PancakeswapExchangeRateTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.PancakeswapExchangeRateTask
DESCRIPTOR
in_token_address
out_token_address
in_token_amount
slippage
provider
#   class OracleJob.CacheTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.CacheTask
DESCRIPTOR
name
method
Method
METHOD_GET
METHOD_SET
#   class OracleJob.SysclockOffsetTask(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.SysclockOffsetTask
DESCRIPTOR
#   class OracleJob.Task(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message):

A ProtocolMessage

Inherited Members
google.protobuf.pyext._message.CMessage
CMessage
MergeFrom
CopyFrom
Clear
SetInParent
IsInitialized
MergeFromString
ParseFromString
SerializeToString
SerializePartialToString
ListFields
HasField
ClearField
WhichOneof
HasExtension
ClearExtension
UnknownFields
DiscardUnknownFields
ByteSize
FromString
RegisterExtension
FindInitializationErrors
Extensions
job_schemas_pb2.Task
DESCRIPTOR
http_task
json_parse_task
median_task
mean_task
websocket_task
divide_task
multiply_task
lp_token_price_task
lp_exchange_rate_task
conditional_task
value_task
max_task
regex_extract_task
xstep_price_task
add_task
subtract_task
twap_task
serum_swap_task
pow_task
lending_rate_task
mango_perp_market_task
jupiter_swap_task
perp_market_task
oracle_task
anchor_fetch_task
defi_kingdoms_task
tps_task
spl_stake_pool_task
spl_token_parse_task
uniswap_exchange_rate_task
sushiswap_exchange_rate_task
pancakeswap_exchange_rate_task
cache_task
sysclock_offset_task
#   class PermissionAccount:
View Source
class PermissionAccount:
    """A Switchboard account representing a permission or privilege granted by one
    account signer to another account.

    Attributes:
        program (anchor.Program): The anchor program ref
        public_key (PublicKey | None): This permission's public key
        keypair (Keypair | None): this permission's keypair
    """

    def __init__(self, params: AccountParams):
        if params.public_key is None and params.keypair is None:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        if params.keypair and params.public_key and params.keypair.public_key != params.public_key:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        self.program = params.program
        self.public_key = params.keypair.public_key if params.keypair else params.public_key
        self.keypair = params.keypair
    

    """
    Check if a specific permission is enabled on this permission account

    Args:
        permission (SwitchboardPermissionValue)

    Returns:
        bool: whether or not the permission is enabled
    """
    async def is_permission_enabled(self, permission: SwitchboardPermissionValue):
        perm_data = await self.load_data()
        permissions = perm_data.permissions
        return (permissions & permission) != 0

    """
    Load and parse PermissionAccount data based on the program IDL

    Args:
    
    Returns:
        PermissionAccount

    Raises:
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def load_data(self):
        return await PermissionAccountData.fetch(self.program.provider.connection, self.public_key)


    """
    Get the size of a PermissionAccount on chain

    Args:

    Returns:
        int: size of the PermissionAccount type on chain
    """
    def size(self):
        return self.program.account["PermissionAccountData"].size

    """
    Create and initialize a PermissionAccount

    Args:
        program (anchor.Program)
        prarams (PermissionInitParams)

    Returns:
        PermissionAccount
    """
    @staticmethod
    async def create(program: anchorpy.Program, params: PermissionInitParams):
        permission_account, permission_bump = PermissionAccount.from_seed(
            program,
            params.authority,
            params.granter,
            params.grantee
        )

        await program.rpc["permission_init"](
            {
                "permission_bump": permission_bump
            },
            ctx=anchorpy.Context(
                accounts={
                    "permission": permission_account.public_key,
                    "authority": params.authority,
                    "granter": params.granter,
                    "grantee": params.grantee,
                    "system_program": system_program.SYS_PROGRAM_ID,
                    "payer": program.provider.wallet.public_key
                },
            )
        )
        return permission_account

    """
    Loads a PermissionAccount from the expected PDA seed format

    Args:
        program (anchorpy.Program)
        authority (public_key): The authority pubkey to be incorporated into the account seed.
        granter (public_key): The granter pubkey to be incorporated into the account seed.
        grantee (public_key): The grantee pubkey to be incorporated into the account seed.

    Returns:
        Tuple[PermissionAccount, int]: PermissionAccount and PDA bump
    """
    @staticmethod
    def from_seed(program: anchorpy.Program, authority: PublicKey, granter: PublicKey, grantee: PublicKey):
        pubkey, bump = PublicKey.find_program_address(
            [
                bytes(b'PermissionAccountData'), 
                bytes(authority),
                bytes(granter),
                bytes(grantee)
            ],
            program.program_id
        )
    
        return PermissionAccount(AccountParams(program=program, public_key=pubkey)), bump

    """
    Sets the permission in the PermissionAccount

    Args: 
        params (PermissionSetParams)
    
    Returns:
        TransactionSignature
    """
    async def set(self, params: PermissionSetParams):
        self.program.rpc["permission_set"](
            {
                "permission": self.program.type["SwitchboardPermission"][params.permission](),
                "authority": params.authority.public_key
            },
            ctx=anchorpy.Context(
                accounts={
                    "permission": self.public_key,
                    "authority": params.authority.public_key
                },
                signers=[params.authority]
            )
        )

A Switchboard account representing a permission or privilege granted by one account signer to another account.

Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This permission's public key keypair (Keypair | None): this permission's keypair

#   PermissionAccount(params: switchboardpy.AccountParams)
View Source
    def __init__(self, params: AccountParams):
        if params.public_key is None and params.keypair is None:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        if params.keypair and params.public_key and params.keypair.public_key != params.public_key:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        self.program = params.program
        self.public_key = params.keypair.public_key if params.keypair else params.public_key
        self.keypair = params.keypair
#   keypair

Check if a specific permission is enabled on this permission account

Args: permission (SwitchboardPermissionValue)

Returns: bool: whether or not the permission is enabled

#   async def is_permission_enabled( self, permission: switchboardpy.permission.SwitchboardPermissionValue ):
View Source
    async def is_permission_enabled(self, permission: SwitchboardPermissionValue):
        perm_data = await self.load_data()
        permissions = perm_data.permissions
        return (permissions & permission) != 0
#   async def load_data(self):
View Source
    async def load_data(self):
        return await PermissionAccountData.fetch(self.program.provider.connection, self.public_key)
#   def size(self):
View Source
    def size(self):
        return self.program.account["PermissionAccountData"].size
#  
@staticmethod
async def create( program: anchorpy.program.core.Program, params: switchboardpy.PermissionInitParams ):
View Source
    @staticmethod
    async def create(program: anchorpy.Program, params: PermissionInitParams):
        permission_account, permission_bump = PermissionAccount.from_seed(
            program,
            params.authority,
            params.granter,
            params.grantee
        )

        await program.rpc["permission_init"](
            {
                "permission_bump": permission_bump
            },
            ctx=anchorpy.Context(
                accounts={
                    "permission": permission_account.public_key,
                    "authority": params.authority,
                    "granter": params.granter,
                    "grantee": params.grantee,
                    "system_program": system_program.SYS_PROGRAM_ID,
                    "payer": program.provider.wallet.public_key
                },
            )
        )
        return permission_account
#  
@staticmethod
def from_seed( program: anchorpy.program.core.Program, authority: solana.publickey.PublicKey, granter: solana.publickey.PublicKey, grantee: solana.publickey.PublicKey ):
View Source
    @staticmethod
    def from_seed(program: anchorpy.Program, authority: PublicKey, granter: PublicKey, grantee: PublicKey):
        pubkey, bump = PublicKey.find_program_address(
            [
                bytes(b'PermissionAccountData'), 
                bytes(authority),
                bytes(granter),
                bytes(grantee)
            ],
            program.program_id
        )
    
        return PermissionAccount(AccountParams(program=program, public_key=pubkey)), bump
#   async def set(self, params: switchboardpy.PermissionSetParams):
View Source
    async def set(self, params: PermissionSetParams):
        self.program.rpc["permission_set"](
            {
                "permission": self.program.type["SwitchboardPermission"][params.permission](),
                "authority": params.authority.public_key
            },
            ctx=anchorpy.Context(
                accounts={
                    "permission": self.public_key,
                    "authority": params.authority.public_key
                },
                signers=[params.authority]
            )
        )
#  
@dataclass
class PermissionInitParams:
View Source
@dataclass
class PermissionInitParams:

    """Pubkey of the account granting the permission"""
    granter: PublicKey

    """The receiving amount of a permission"""
    grantee: PublicKey

    """The authority that is allowed to set permissions for this account"""
    authority: PublicKey

Pubkey of the account granting the permission

#   PermissionInitParams( granter: solana.publickey.PublicKey, grantee: solana.publickey.PublicKey, authority: solana.publickey.PublicKey )
#   granter: solana.publickey.PublicKey

The receiving amount of a permission

#   grantee: solana.publickey.PublicKey

The authority that is allowed to set permissions for this account

#  
@dataclass
class PermissionSetParams:
View Source
@dataclass
class PermissionSetParams:

    """The permission to set"""
    permission: SwitchboardPermission

    """The authority controlling this permission"""
    authority: Keypair

    """Specifies whether to enable or disable the permission"""
    enable: bool

The permission to set

#   PermissionSetParams( permission: switchboardpy.permission.SwitchboardPermission, authority: solana.keypair.Keypair, enable: bool )
#   permission: switchboardpy.permission.SwitchboardPermission

The authority controlling this permission

#   authority: solana.keypair.Keypair

Specifies whether to enable or disable the permission

#   class ProgramStateAccount:
View Source
class ProgramStateAccount:
    """Account type representing Switchboard global program state.

    Attributes:
        program (anchor.Program): The anchor program ref
        public_key (PublicKey | None): This program's public key
        keypair (Keypair | None): this program's keypair
    """


    def __init__(self, params: AccountParams):
        if params.public_key is None and params.keypair is None:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        if params.keypair and params.public_key and params.keypair.public_key != params.public_key:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        self.program = params.program
        self.public_key = params.keypair.public_key if params.keypair else params.public_key
        self.keypair = params.keypair
    
    """
    Constructs ProgramStateAccount from the static seed from which it was generated.

    Args:
        program (anchorpy.Program): Anchor-loaded aggregator

    Returns:
        ProgramStateAccount and PDA bump tuple.
    """
    @staticmethod
    def from_seed(program: anchorpy.Program):
        state_pubkey, state_bump = publickey.PublicKey.find_program_address(['STATE'.encode()], program.program_id)
        return ProgramStateAccount(AccountParams(program=program, public_key=state_pubkey)), state_bump

    """
    Load and parse ProgramStateAccount state based on the program IDL. 
    
    Args:

    Returns:
        name (Any): data parsed in accordance with the
            Switchboard IDL.

    Raises:
        AccountDoesNotExistError: If the account doesn't exist.
        AccountInvalidDiscriminator: If the discriminator doesn't match the IDL.
    """
    async def load_data(self):
        return await SbState.fetch(self.program.provider.connection, self.public_key)


    """
    Fetch the Switchboard token mint specified in the program state account.
    
    Args:

    Returns:
        anchorpy.
    """
    async def get_token_mint(self) -> AsyncToken:
        payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key)
        state = await self.load_data()
        switch_token_mint = AsyncToken(self.program.provider.connection, state.token_mint, TOKEN_PROGRAM_ID, payer_keypair)
        return switch_token_mint

    """
    Get the size of the global ProgramStateAccount on chain
    
    Returns:
        int: size of the ProgramStateAccount on chain 
    """
    def size(self):
        return self.program.account["SbState"].size

    """
    Create and initialize the ProgramStateAccount

    Args:
        program (anchorpy.Program): anchor program
        params (ProgramInitParams): optionally pass in mint address

    Returns:
        ProgramStateAccount that was generated
    """
    @staticmethod
    async def create(program: anchorpy.Program, params: ProgramInitParams):
        payer_keypair = Keypair.from_secret_key(program.provider.wallet.payer.secret_key)
        state_account, state_bump = ProgramStateAccount.from_seed(program)
        psa = ProgramStateAccount(AccountParams(program=program, public_key=state_account.public_key))
        try:
            await psa.load_data()
            return psa
        except Exception:
            pass
        mint = None
        vault = None
        if params.mint == None:
            decimals = 9
            mint, vault = await anchorpy.utils.token.create_mint_and_vault(
                program.provider,
                100_000_000,
                payer_keypair.public_key,
                decimals
            )
        else:
            mint = params.mint
            token = AsyncToken(
                program.provider.connection,
                mint,
                TOKEN_PROGRAM_ID,
                payer_keypair
            )
            vault = await token.create_account(payer_keypair.public_key)
        await program.rpc["program_init"](
            {
                "state_bump": state_bump
            },
            ctx=anchorpy.Context(
                accounts={
                    "state": state_account.public_key,
                    "authority": payer_keypair.public_key,
                    "token_mint": mint,
                    "vault": vault,
                    "payer": payer_keypair.public_key,
                    "system_program": system_program.SYS_PROGRAM_ID,
                    "token_program": TOKEN_PROGRAM_ID
                },
            )
        )

    """
    Transfer N tokens from the program vault to a specified account.

    Args:
        to (PublicKey): The recipient of the vault tokens.
        authority (Keypair): The vault authority required to sign the transfer tx
        params (VaultTransferParams): Specifies the amount to transfer.
  
    Returns:
        TransactionSignature
    """
    async def vault_transfer(self, to: PublicKey, authority: Keypair, params: VaultTransferParams):
        state_pubkey, state_bump = ProgramStateAccount.from_seed(self.program)
        state = await self.load_data()
        vault = state.token_vault
        await self.program.rpc["vault_transfer"](
            {
                "state_bump": state_bump,
                "amount": params.amount # @FIXME - can't be a decimal, must have mantissa / scale
            },
            ctx=anchorpy.Context(
                accounts={
                    "state": state_pubkey,
                    "to": to,
                    "vault": vault,
                    "authority": authority.public_key,
                    "token_program": TOKEN_PROGRAM_ID,
                },
                signers=[authority]
            )
        )

Account type representing Switchboard global program state.

Attributes: program (anchor.Program): The anchor program ref public_key (PublicKey | None): This program's public key keypair (Keypair | None): this program's keypair

#   ProgramStateAccount(params: switchboardpy.AccountParams)
View Source
    def __init__(self, params: AccountParams):
        if params.public_key is None and params.keypair is None:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        if params.keypair and params.public_key and params.keypair.public_key != params.public_key:
            raise ValueError('User must provide either a publicKey or keypair for account use.')
        self.program = params.program
        self.public_key = params.keypair.public_key if params.keypair else params.public_key
        self.keypair = params.keypair
#   keypair

Constructs ProgramStateAccount from the static seed from which it was generated.

Args: program (anchorpy.Program): Anchor-loaded aggregator

Returns: ProgramStateAccount and PDA bump tuple.

#  
@staticmethod
def from_seed(program: anchorpy.program.core.Program):
View Source
    @staticmethod
    def from_seed(program: anchorpy.Program):
        state_pubkey, state_bump = publickey.PublicKey.find_program_address(['STATE'.encode()], program.program_id)
        return ProgramStateAccount(AccountParams(program=program, public_key=state_pubkey)), state_bump
#   async def load_data(self):
View Source
    async def load_data(self):
        return await SbState.fetch(self.program.provider.connection, self.public_key)
#   async def get_token_mint(self) -> spl.token.async_client.AsyncToken:
View Source
    async def get_token_mint(self) -> AsyncToken:
        payer_keypair = Keypair.from_secret_key(self.program.provider.wallet.payer.secret_key)
        state = await self.load_data()
        switch_token_mint = AsyncToken(self.program.provider.connection, state.token_mint, TOKEN_PROGRAM_ID, payer_keypair)
        return switch_token_mint
#   def size(self):
View Source
    def size(self):
        return self.program.account["SbState"].size
#  
@staticmethod
async def create( program: anchorpy.program.core.Program, params: switchboardpy.ProgramInitParams ):
View Source
    @staticmethod
    async def create(program: anchorpy.Program, params: ProgramInitParams):
        payer_keypair = Keypair.from_secret_key(program.provider.wallet.payer.secret_key)
        state_account, state_bump = ProgramStateAccount.from_seed(program)
        psa = ProgramStateAccount(AccountParams(program=program, public_key=state_account.public_key))
        try:
            await psa.load_data()
            return psa
        except Exception:
            pass
        mint = None
        vault = None
        if params.mint == None:
            decimals = 9
            mint, vault = await anchorpy.utils.token.create_mint_and_vault(
                program.provider,
                100_000_000,
                payer_keypair.public_key,
                decimals
            )
        else:
            mint = params.mint
            token = AsyncToken(
                program.provider.connection,
                mint,
                TOKEN_PROGRAM_ID,
                payer_keypair
            )
            vault = await token.create_account(payer_keypair.public_key)
        await program.rpc["program_init"](
            {
                "state_bump": state_bump
            },
            ctx=anchorpy.Context(
                accounts={
                    "state": state_account.public_key,
                    "authority": payer_keypair.public_key,
                    "token_mint": mint,
                    "vault": vault,
                    "payer": payer_keypair.public_key,
                    "system_program": system_program.SYS_PROGRAM_ID,
                    "token_program": TOKEN_PROGRAM_ID
                },
            )
        )
#   async def vault_transfer( self, to: solana.publickey.PublicKey, authority: solana.keypair.Keypair, params: switchboardpy.VaultTransferParams ):
View Source
    async def vault_transfer(self, to: PublicKey, authority: Keypair, params: VaultTransferParams):
        state_pubkey, state_bump = ProgramStateAccount.from_seed(self.program)
        state = await self.load_data()
        vault = state.token_vault
        await self.program.rpc["vault_transfer"](
            {
                "state_bump": state_bump,
                "amount": params.amount # @FIXME - can't be a decimal, must have mantissa / scale
            },
            ctx=anchorpy.Context(
                accounts={
                    "state": state_pubkey,
                    "to": to,
                    "vault": vault,
                    "authority": authority.public_key,
                    "token_program": TOKEN_PROGRAM_ID,
                },
                signers=[authority]
            )
        )
#  
@dataclass
class ProgramInitParams:
View Source
@dataclass
class ProgramInitParams:

    """Optional token mint"""
    mint: PublicKey = None

Optional token mint

#   ProgramInitParams(mint: solana.publickey.PublicKey = None)
#   mint: solana.publickey.PublicKey = None
#  
@dataclass
class VaultTransferParams:
View Source
@dataclass
class VaultTransferParams:

    """Amount being transferred"""
    amount: Decimal

Amount being transferred

#   VaultTransferParams(amount: decimal.Decimal)
#  
@dataclass
class SwitchboardDecimal:
View Source
@dataclass
class SwitchboardDecimal:
    mantissa: int
    scale: int

    """
    Convert BN.js style num and return SwitchboardDecimal

    Args:
        obj (Any): Object with integer fields scale and mantissa (hex val)

    Returns:
        sbd (SwitchboardDecimal): SwitchboardDecimal
    """
    @staticmethod
    def fromObj(obj: Any):
        return SwitchboardDecimal(
            mantissa=obj.mantissa,
            scale=obj.scale
        )
    
    def to_decimal(self, sbd: object):
        mantissa = Decimal(sbd.mantissa)
        scale = sbd.scale
        return mantissa / Decimal(10 ** scale)

    @staticmethod
    def from_decimal(dec: Decimal):
        _, digits, exponent = dec.as_tuple()
        integer = reduce(lambda rst, x: rst * 10 + x, digits)
        return SwitchboardDecimal(integer, exponent)

    # convert any switchboard-decimal-like object to a decimal
    @staticmethod
    def sbd_to_decimal(sbd: object) -> Decimal:
        mantissa = Decimal(sbd.mantissa)
        scale = sbd.scale
        return mantissa / Decimal(10 ** scale)

    # for sending as argument in transaction
    def as_proper_sbd(self, program: anchorpy.Program):
        return program.type['SwitchboardDecimal'](self.mantissa, self.scale)

    def __eq__(self, __o: object) -> bool:
        if not (hasattr(__o, 'mantissa') and hasattr(__o, 'scale')):
            return False
        return self.mantissa == __o.mantissa and self.scale == __o.scale

SwitchboardDecimal(mantissa: int, scale: int)

#   SwitchboardDecimal(mantissa: int, scale: int)
#   scale: int

Convert BN.js style num and return SwitchboardDecimal

Args: obj (Any): Object with integer fields scale and mantissa (hex val)

Returns: sbd (SwitchboardDecimal): SwitchboardDecimal

#  
@staticmethod
def fromObj(obj: Any):
View Source
    @staticmethod
    def fromObj(obj: Any):
        return SwitchboardDecimal(
            mantissa=obj.mantissa,
            scale=obj.scale
        )
#   def to_decimal(self, sbd: object):
View Source
    def to_decimal(self, sbd: object):
        mantissa = Decimal(sbd.mantissa)
        scale = sbd.scale
        return mantissa / Decimal(10 ** scale)
#  
@staticmethod
def from_decimal(dec: decimal.Decimal):
View Source
    @staticmethod
    def from_decimal(dec: Decimal):
        _, digits, exponent = dec.as_tuple()
        integer = reduce(lambda rst, x: rst * 10 + x, digits)
        return SwitchboardDecimal(integer, exponent)
#  
@staticmethod
def sbd_to_decimal(sbd: object) -> decimal.Decimal:
View Source
    @staticmethod
    def sbd_to_decimal(sbd: object) -> Decimal:
        mantissa = Decimal(sbd.mantissa)
        scale = sbd.scale
        return mantissa / Decimal(10 ** scale)
#   def as_proper_sbd(self, program: anchorpy.program.core.Program):
View Source
    def as_proper_sbd(self, program: anchorpy.Program):
        return program.type['SwitchboardDecimal'](self.mantissa, self.scale)
#   readRawVarint32
#   readDelimitedFrom