Compare commits

...

98 Commits

Author SHA1 Message Date
Daniel Chew cf7987f4c5
feat(target_chains/fuel): add governance contract (#1518)
* add governance contract

* add fuel ci

* add rust-toolchain

* add executes_governance_instruction test

* add test for SetValidPeriod

* add test for AuthorizeGovernanceDataSourceTransfer

* remove SetWormholeAddress

* add test for SetDataSources

* remove WormholeAddressSetEvent

* remove SetWormholeAddress

* remove SetWormholeAddressPayload

* remove SetWormholeAddressPayload and SetWormholeAddress imports

* remove GovernanceAction::SetWormholeAddress

* address comments

* refactor test

* add comments
2024-05-09 17:42:28 +01:00
Amin Moghaddam 6e0bd0569b
feat(contract_manager)Add option to test for all entropy contracts (#1559) 2024-05-09 12:36:14 +01:00
Pavel Strakhov 1e1be9dbeb
refactor(target_chains/starknet): remove old config instructions and owner from wormhole (#1558) 2024-05-09 09:42:03 +01:00
Pavel Strakhov d105a7aa86
refactor(target_chains/starknet): use EthAddress and is_eth_signature_valid (#1556) 2024-05-08 07:20:30 +01:00
Pavel Strakhov dd9b07b5e4
refactor(target_chains/starknet): generalize array_try_into (#1555) 2024-05-07 16:18:18 +01:00
Pavel Strakhov e26c9d1a30
refactor(target_chains/starknet): split pyth module (#1554) 2024-05-07 14:20:59 +01:00
Pavel Strakhov 9dddd3d1e7
feat(target_chains/starknet): handle wormhole guardian set upgrade VAA (#1550)
* feat(target_chains/starknet): handle wormhole guardian set upgrade VAA

* test(target_chains/starknet): add failing tests for governance

* doc(target_chains/starknet): add comment about wormhole governance
2024-05-07 10:33:09 +01:00
Dev Kalra 77c68c5069
fix(fortuna): watch blocks from start and infinite get queries (#1551)
* fix: watch blocks from start and infinite get queries

* formatting

* fix

* undo small change
2024-05-07 14:21:52 +05:30
Pavel Strakhov bf2c8b5d43
refactor(target_chains/starknet): blanket impl for unwrap_with_felt252 (#1549) 2024-05-07 08:04:55 +01:00
Reisen 3f07c27243
chore(aptos): bump to 3.1.0 toolchain/cli (#1543) 2024-05-06 16:36:01 +01:00
Pavel Strakhov 42b64ac09f
refactor(target_chains/starknet): remove Result from merkle_tree and pyth setters (#1548)
* refactor(target_chains/starknet): remove Result from merkle_tree

* refactor(target_chains/starknet): remove Result from pyth contract setters
2024-05-06 16:21:36 +01:00
Pavel Strakhov 55cbe62997
feat(target_chains/starknet): wormhole governance VAA verification (#1547)
* feat(target_chains/starknet): wormhole governance VAA verification

* refactor(target_chains/starknet): rename VM to VerifiedVM
2024-05-06 14:07:49 +01:00
Pavel Strakhov 94b36c4961
refactor(target_chains/starknet): remove Result from wormhole (#1541) 2024-05-06 11:27:28 +01:00
Jayant Krishnamurthy ff6b11023c
[price_pusher] Option to ignore gas objects (#1545)
* gr

* bump version
2024-05-03 21:41:14 -07:00
Jayant Krishnamurthy 4966b956df
[price_pusher] Sui pusher debugging messages (#1544)
* add logging

* version

* gr
2024-05-03 18:04:43 -07:00
Dev Kalra 10dc4a05b8
feat(fortuna_staging): use spans to create a hierarchy of logs (#1540)
* use spans to create a hierarchy of logs

* minor imp

* remove chain id

* add a sequence processing

* added comments

* consistent with other threads

* extract method out

* add field to process block range

* add field to watch events logs

* rename method

* extract process batch method

* tidy

* update log for eth

* remove comment

* update version

* address feedback
2024-05-03 21:27:42 +05:30
Ali Behjati 586a4398bd
feat(price_pusher): add more options to evm pusher (#1538)
This change makes gasLimit configurable and also adds an
updateFeeMultiplier which is useful in Hedera as they have an
inconsistency between the `value` passed in tx and the `value` on-chain.
2024-05-03 09:59:19 +02:00
guibescos 020ecdf5da
Solve (#1539) 2024-05-03 09:46:09 +02:00
Pavel Strakhov 308599714f
refactor(target_chains/starknet): remove Result from reader (#1536) 2024-05-02 15:48:55 +01:00
Dev Kalra 587a6fa524
feat(contract_manager): upgrade deploy scripts to use wormhole store (#1523)
* upgrade deploy scripts to use wormhole store

* deploy not find

* deploy price feed and entropy to taiko

* rename type

* rename method

* address feedback

* update js docs

* deploy to olive testnet

* pre commit

* rename deploy config to base deploy config
2024-05-02 16:48:16 +05:30
guibescos a592c6bc33
fix: publish workflow (#1532)
* dry run

* check

* Fix workflows

* rexport price feed message
2024-05-01 19:11:25 +01:00
Aditya Arora 31483a9fc7
chore-add-evm-new-chains (#1533) 2024-05-01 19:35:35 +02:00
cipherZ 3d9781ed58
Update README.md (#1526)
* Update README.md

changed method doc

* Update README.md

---------

Co-authored-by: guibescos <59208140+guibescos@users.noreply.github.com>
2024-05-01 15:20:02 +01:00
guibescos 344f8a9e47
feat: add anchor to pythnet sdk (#1531)
* add anchor to pythnet sdk

* bump

* bump

* bump

* please work

* Solve
2024-05-01 14:39:17 +01:00
Amin Moghaddam b2cb7c878a
fix(fortuna): Fortuna improvements (#1529)
* More logging on failure of deserialization
* Log chain id if the provider registration is failing
* Fix sample config
* Fix dysfunctional rpc address from blast
2024-05-01 13:19:37 +02:00
Pavel Strakhov 4e630edac0
feat(target_chains/starknet): fee collection (#1527)
* feat(target_chains/starknet): fee collection

* refactor(target_chains/starknet): renames and comments
2024-04-30 23:03:29 +01:00
guibescos 20d99bceb7
feat: enable remote stake delegation (#1501)
* feat: enable remote stake delegation

* Cleanup space

* Go

* feat: drive-by priority fees

* fix: pr comments

* fix: pr comments
2024-04-30 15:46:44 +01:00
Dev Kalra d2ce2ecd33
feat(fortuna): add a cli arg to run keeper (#1517)
* optional run keeper

* update comment

* instead of flag use the keeper key file

* update version

* rename method
2024-04-30 20:10:48 +05:30
guibescos 2095da34e9
feat: add input boxes (#1515)
* feat: add send usd app

* fix: cargo tomls

* fix: pre-commit

* fix: improve code quality

* fix: fix names and texts

* fix: pre-commit

* feat: add send usd example to monorepo

* fix: connection endpoint for send usd example

* fix: priceUpdateData

* fix: packages

* fix: remove unused variables

* fix: packages

* fix: test

* fix: tests

* fix: test

* remove file

* fix

* go

* Try removing test script

* Remove npm run test from the text

* Add input box

* is this the same

* pre-commit

---------

Co-authored-by: keyvan <keyvankhademi@gmail.com>
2024-04-30 14:07:05 +01:00
Pavel Strakhov a8dbabc7f9
feat(target_chains/starknet): add fee configuration (#1525) 2024-04-30 13:19:24 +01:00
Pavel Strakhov cae194eb62
Starknet: update_price_feeds and latest_price_info (#1482)
* feat(target_chains/starknet): update_price_feeds and latest_price_info

* test(target_chains/starknet): basic test for pyth contract

* chore(target_chains/starknet): update deploy script

* feat(target_chains/starknet): added get_price_unsafe and get_ema_price_unsafe

* refactor(target_chains/starknet): match on UpdateType and MessageType
2024-04-30 12:26:41 +01:00
NinaLua 8d32b4c2fc
chore: remove repetitive words (#1524) 2024-04-30 12:17:10 +01:00
Ali Behjati a203808a44
refactor(cosmwasm/tools): update cosmjs dependencies (#1514)
* refactor(cosmwasm/tools): update cosmjs dependencies

We needed to update cosmjs dependencies to support Xion, which is based
on a new CometBFT-based variation of tendermint. This change also
includes the artifacts for the Xion testnet network.

* fix: pin a dependency to get nextjs to work

* fix: address review comments
2024-04-29 19:34:57 +02:00
Aditya Arora 24a08a06c5
chore(pricefeed-evm-sdk): Improve doc comments (#1521) 2024-04-29 12:42:30 -04:00
Pavel Strakhov f212907a8b test(target_chains/starknet): add byte array tests 2024-04-29 14:51:06 +01:00
Ali Behjati ef922220ee
chore(contract_manager): Rename package to @pythnetwork/contract-manager (#1507)
This change renames the contract manager package name to @pythnetwork/contract-manager to be consistent with our package names.
2024-04-29 15:25:05 +02:00
Jayant Krishnamurthy 6da2e1ba53
[hermes] Add deprecation notices to old API methods (#1516)
* [hermes] Add deprecation notices for doc purposes

* bump version

* deprecation
2024-04-29 14:26:41 +02:00
Pavel Strakhov 050a3412f9 refactor(target_chains/starknet): move ByteArray to a separate module 2024-04-29 12:57:59 +01:00
Dev Kalra cf90bff236
feat(fortuna): support multiple hashchains (#1509)
* introduce provider config

* get provider chain config in order

* hash chain with multiple pebble chains

* update script to get metadata

* update version

* comments and move things around

* update comment

* minor fixes

* separate pr for this

* rename provider-config

* sample config

* auto sort commitments

* use seed and chain length

* refactor and simplify hashchain and offset vec

* better formatting

* make commitments private

* optional chain in provider-config

* set default value of chain length

* Version 5.0.0

* update comments

* version update

* optional provider config
2024-04-26 16:59:16 +05:30
Keyvan Khademi 37ee3b46bd
feat: add solana send usd example app (#1471)
* feat: add send usd app

* fix: cargo tomls

* fix: pre-commit

* fix: improve code quality

* fix: fix names and texts

* fix: pre-commit

* feat: add send usd example to monorepo

* fix: connection endpoint for send usd example

* fix: priceUpdateData

* fix: packages

* fix: remove unused variables

* fix: packages

* fix: test

* fix: tests

* fix: test

* remove file

* fix

* go

* Try removing test script

* Remove npm run test from the text

---------

Co-authored-by: Guillermo Bescos <g.bescos@yahoo.com>
2024-04-25 20:19:11 +01:00
Dev Kalra b47ee059d7
feat(contract-manager): implement a script to get the entropy current registration (#1512)
* write a script to get the current registration

* simplify

* correct description

* catch only rpc errors

* refactor and simplify
2024-04-25 21:00:39 +05:30
Daniel Chew c2da454637
add fuel contract by Fuel Labs (#1513) 2024-04-25 22:51:41 +09:00
Dev Kalra 567b4a6597
fix(fortuna/setup-provider): compare chain-length command line input (#1511)
* fix setup provider job

* only need to replace chain length input

* remove space
2024-04-25 17:41:06 +05:30
Dani Mehrjerdi 2014d1e205
feat(express-relay): Add simulation_failed to bid status (#1503) 2024-04-25 14:37:21 +04:00
Aditya Arora 93a71f2eef
pre-commit (#1510) 2024-04-25 09:15:51 +02:00
Dev Kalra 9437d51843
feat(contract_manager): add keeper balance to list entry (#1506)
* add keeper balance to list entry

* don't fix it as not sure
2024-04-24 16:37:14 +05:30
Dev Kalra d31cefb446
feat(contract_manager): separate store for wormhole (#1493)
* rename wormhole contract as per other names

* store for wormhole

* fix var name

* rename var

* rename contract based on other namings

* add yaml for aptos and cosmwasm
2024-04-24 16:32:16 +05:30
Dev Kalra 48a5faf4d9
specify only channel and date for the toolchain (#1505) 2024-04-24 15:59:40 +05:30
Keyvan Khademi b110bbca5c
feat(xc-admin-frontend): instructions summary in proposal page + improve ui in proposal row + refactor the code (#1478)
* refactor: move proposals to a folder

* refactor: use @images instead of relative paths

* refactor: split proposals into multiple files

* refactor: add type for proposal status

* refactor: add eslint and fix errors

* refactor: fix eslint errors

* refactor: fix eslint

* refactor: fix prettier

* refactor: remove any

* refactor: Proposals.tsx

* feat: add basic instructions summary

* feat: add unknown instruction

* fix: revert package-lock.json

* fix: update package-lock.json

* fix: pre-commit

* fix: ts error

* fix: remove message buffer dependency

* fix: revert back the cluster default

* feat: add support for different types of instructions

* feat: add transaction index to proposal row

* feat: improve the proposal row ui

* fix: display bigint properly (#1499)

---------

Co-authored-by: guibescos <59208140+guibescos@users.noreply.github.com>
2024-04-23 13:24:44 -07:00
Dev Kalra d05df508a8
deploy entropy on sei (#1500) 2024-04-23 20:58:06 +05:30
Aditya Arora d51e5712f4
redepoloyed (#1477) 2024-04-23 11:15:07 -04:00
Ali Behjati 1a3e3a7c00
refactor(hermes): match mapping address argument style with the rest (#1498) 2024-04-23 15:31:27 +02:00
Dev Kalra 4b8b9bfd87
feat(contract_manager): latency script for entropy v2 (#1494)
* latency script for entropy v2

* add block number difference

* correct desc

* refactor request randomness

* refactor and use chain as arg instead of contract

* unnecessary condition

* js doc

* correct desc

* use blockhash
2024-04-23 18:59:52 +05:30
Pavel Strakhov c7883c822b doc(target_chains/starknet): add readme 2024-04-23 13:05:10 +01:00
Pavel Strakhov b30604c5ba doc(target_chains/starknet): add local deployment script 2024-04-23 12:41:59 +01:00
Ali Behjati d50488ef5c
refactor(hermes): Change Pythnet mapping account env var (#1495)
Prefix the env var with Pythnet to be more clear.
2024-04-23 12:41:50 +02:00
Ali Behjati 64037e5b4a
fix(hermes): ignore no subscriber error on broadcast (#1492)
This change ignores the errors appearing when there are
no subsribers to price updates. The issue was fixed before
and was missed during the refactor.
2024-04-23 11:21:22 +02:00
guibescos 4445c73443
Go (#1489) 2024-04-22 20:44:10 +01:00
guibescos 5b494689d2
chore: sample configs for Solana pusher (#1491)
* Continue

* Sample configs
2024-04-22 20:43:59 +01:00
Dev Kalra e46821d423
feat(xc_admin_frontend): parse entropy and executor abis (#1481)
* parse entropy and executor abis

* correct import

* move parse to xc admin frontend

* undo change

* fix deps

* add comment

* comment

* revert changes and then some minor change

* fix unknown by typecast
2024-04-23 00:15:36 +05:30
Ali Behjati 644b54676c
fix(hermes): update cache.rs path in dockerignore (#1490)
Our dockerignore ignores all the files containing cache in their name
and hermes had an exception here. This change was missed in moving
Hermes around.
2024-04-22 19:42:14 +02:00
Ali Behjati f9292177e9
fix(hermes): reconnect on wh connection termination (#1488)
* fix(hermes): reconnect on wh connection termination

`tokio::select` disables the branch that runs the wh connection
if it returns OK and it never gets checked again. This change
changes the `run` return to never return OK.

* refactor(hermes): use Result<!> in pythnet network listener thread
2024-04-22 19:07:22 +02:00
guibescos 1b13bf651a
fix(solana_pusher): use processed to poll (#1486)
* Do it

* Do it
2024-04-22 17:41:49 +01:00
Pavel Strakhov e8c198065e feat(target_chains/starknet): add merkle tree utils 2024-04-22 17:38:31 +01:00
Pavel Strakhov a1e4fc0924 feat(target_chains/starknet): add utils for decoding signed integers, move array_felt252_to_bytes31 to utils 2024-04-22 17:38:31 +01:00
Anirudh Suresh 67132c0572
try python version env spec (#1484)
* try python version env spec

* Test it out on pull request

* test env change

* test env change 2

* test env change 3

* test env change 4

* address circular import

* test

* test

* test

* test

* test

* undoing test

---------

Co-authored-by: Amin Moghaddam <amin@pyth.network>
2024-04-22 12:25:20 -04:00
guibescos c7c3527bfe
fix: initialize guardian expiration to the right value (#1485) 2024-04-22 17:14:42 +01:00
Reisen 8b76d8c19a
refactor(hermes): state->aggregate downcasting (#1479) 2024-04-22 17:07:27 +01:00
Dani Mehrjerdi bdc2e967b0
refactor(express-relay): Update execution params hash data (#1476) 2024-04-20 11:25:11 +04:00
Pavel Strakhov e04edcfece fix(target_chains/starknet): make parse_and_verify_vm a read-only method 2024-04-19 17:48:31 +01:00
Pavel Strakhov ffbe02b4f6 fix(target_chains/starknet): verify new guardian set before writing to storage 2024-04-19 17:48:31 +01:00
Pavel Strakhov 26bbe4a0ef refactor(target_chains/starknet): check value in Hasher::push_num_bytes 2024-04-19 17:48:31 +01:00
Pavel Strakhov 8b66d0f814 refactor(target_chains/starknet): use enum errors 2024-04-19 17:48:31 +01:00
Pavel Strakhov 0a219fbead refactor(target_chains/starknet): errors modules, reexport errors 2024-04-19 17:48:31 +01:00
Pavel Strakhov 30c741ed49 feat(target_chains/starknet): add multi-purpose keccak hasher 2024-04-19 17:48:31 +01:00
Pavel Strakhov 5fac32fa40 chore(target_chains/starknet): add workflow 2024-04-19 17:48:31 +01:00
Pavel Strakhov 6e62328528 test(target_chains/starknet): add wormhole contract tests 2024-04-19 17:48:31 +01:00
Pavel Strakhov 2d9c6d3028 feat(target_chains/starknet): wormhole VAA verification and parsing 2024-04-19 17:48:31 +01:00
guibescos 508de75839
chore: fix ci (#1475)
* Fix ci

* cleanup

* Go
2024-04-19 17:46:53 +01:00
Dev Kalra 7bbcfa80d4
feat(fortuna): improve logging (#1472)
* update existing logs in keeper

* handle provider error

* handle implicit errors

* address feedback
2024-04-19 22:08:24 +05:30
guibescos 0d6c35fce8
feat: propagate errors in send transactions (#1469)
* Go

* Go

* Propagate errors

* lint

* Bump version
2024-04-19 16:57:00 +01:00
Diyahir c58b675a63
Force Node version (#1473) 2024-04-19 15:40:55 +01:00
Aditya Arora 899a995e2e
sei_devnet_redeploy (#1474) 2024-04-19 10:38:30 -04:00
guibescos 5a676978db
feat: add pyth_push_oracle to dockerfile (#1470)
* Add pusher to dcokerfile

* Update
2024-04-19 15:36:25 +01:00
Anirudh Suresh 3f6a14897d
fix per sdk pypi workflow (#1454)
* python3.12 -> python3.11?

* test poetry build works

* add back the publishing

---------

Co-authored-by: ani <ani@Anirudhs-MacBook-Pro.local>
2024-04-19 10:08:11 -04:00
Ali Behjati 481a428e88
chore(contract_manager): add scripts and changes to upgrade wh guardi… (#1468)
* chore(contract_manager): add scripts and changes to upgrade wh guardian set

This change adds `sync_wormhole_guardian_set.ts` script to update all of
our evm and cosmwasm contracts and has a couple of fixes in different
places to make sure everything works fine.

* fix: address review comments
2024-04-19 10:52:12 +02:00
Jayant Krishnamurthy 3f58a2a8b3
[price_pusher] add shebang command (#1467) 2024-04-18 12:11:40 -07:00
Keyvan Khademi 76205745c8
feat: add solana target chain example (#1446)
* feat: add send usd example for solana

* fix: rust fmt

* fix: remove unused dependency ahash

* fix: imports

* refactor: use get_price_no_older_than

* fix: package name

* fix: fix naming conventions

* feat: add workspace members in Anchor.toml

* fix: set maximum age to 1 hour

* fix: use public crates for send usd example
2024-04-18 08:11:50 -07:00
Dani Mehrjerdi 93efd61ea4
feat!(express_relay): Update bid's signature to eip712 (#1455) 2024-04-18 18:55:18 +04:00
Reisen 8be6a9ad1c
refactor(price_pusher): re-organize in monorepo (#1464)
* refactor(price_pusher): re-organize in monorepo

* revert(price_pusher): undo gitignore
2024-04-18 13:34:46 +01:00
guibescos 76ec4e3322
feat: move VAA_SPLIT_INDEX (#1466)
* feat: move VAA_SPLIT_INDEX

* CI
2024-04-17 18:14:11 +01:00
guibescos 56cbace282
feat: encourage using random treasury id (#1465)
* Encourage random treasury id

* GO
2024-04-17 17:47:14 +01:00
Reisen ba435bac76 refactor(fortuna): re-organize in monorepo 2024-04-17 15:21:57 +01:00
guibescos 73798b9bdd
Add 4th guardian set (#1457) 2024-04-17 15:01:32 +01:00
Dev Kalra 02ad78bcf1
Update coinflip contracts to use entropy v2 (#1461)
* update coinflip contracts to use entropy v2

* correct formatting
2024-04-17 19:27:33 +05:30
guibescos 8d92ad9931
feat: support closing vaas (#1460)
* feat: support closing vaas

* Go

* Max out

* Cleanup

* Refactor, add comments

* Add max

* Remove script

* bump solana utils

* Revert "Fix: guardian set (#1459)"

This reverts commit d9c85d8f9d.

* Update compute budget

* Go

* Restore

* Bump
2024-04-17 14:49:06 +01:00
Aditya Arora c12a58e0e4
Bugfix: price-service: Add bn.js dependency (#1458)
* price-service-added-bn.js

* requested changes
2024-04-17 09:30:34 -04:00
Amin Moghaddam ee1d61ac71
Fix js checks (#1462) 2024-04-17 09:45:17 +02:00
354 changed files with 42370 additions and 12067 deletions

View File

@ -15,5 +15,4 @@
.git .git
hermes/wormhole !apps/hermes/src/state/cache.rs
!hermes/src/state/cache.rs

View File

@ -21,10 +21,10 @@ jobs:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
- name: Download CLI - name: Download CLI
run: wget https://github.com/aptos-labs/aptos-core/releases/download/aptos-cli-v1.0.4/aptos-cli-1.0.4-Ubuntu-22.04-x86_64.zip run: wget https://github.com/aptos-labs/aptos-core/releases/download/aptos-cli-v3.1.0/aptos-cli-3.1.0-Ubuntu-22.04-x86_64.zip
- name: Unzip CLI - name: Unzip CLI
run: unzip aptos-cli-1.0.4-Ubuntu-22.04-x86_64.zip run: unzip aptos-cli-3.1.0-Ubuntu-22.04-x86_64.zip
- name: Run tests - name: Run tests
run: ./aptos move test run: ./aptos move test

View File

@ -2,10 +2,10 @@ name: Check Fortuna
on: on:
pull_request: pull_request:
paths: [fortuna/**] paths: [apps/fortuna/**]
push: push:
branches: [main] branches: [main]
paths: [fortuna/**] paths: [apps/fortuna/**]
jobs: jobs:
test: test:
runs-on: ubuntu-latest runs-on: ubuntu-latest
@ -17,4 +17,4 @@ jobs:
toolchain: nightly-2023-07-23 toolchain: nightly-2023-07-23
override: true override: true
- name: Run executor tests - name: Run executor tests
run: cargo test --manifest-path ./fortuna/Cargo.toml run: cargo test --manifest-path ./apps/fortuna/Cargo.toml

35
.github/workflows/ci-fuel-contract.yml vendored Normal file
View File

@ -0,0 +1,35 @@
name: Test Fuel Contract
on:
pull_request:
paths:
- target_chains/fuel/**
push:
branches:
- main
paths:
- target_chains/fuel/**
env:
CARGO_TERM_COLOR: always
jobs:
build:
runs-on: ubuntu-latest
defaults:
run:
working-directory: target_chains/fuel/contracts/
steps:
- uses: actions/checkout@v2
- name: Install Fuel toolchain
run: |
curl https://install.fuel.network | sh
echo "$HOME/.fuelup/bin" >> $GITHUB_PATH
- name: Build with Forc
run: forc build --verbose
- name: Run tests with Forc
run: forc test --verbose
- name: Build
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose

View File

@ -0,0 +1,37 @@
name: Starknet contract
on:
pull_request:
paths:
- target_chains/starknet/contracts/**
push:
branches:
- main
paths:
- target_chains/starknet/contracts/**
jobs:
check:
name: Starknet Foundry tests
runs-on: ubuntu-latest
defaults:
run:
working-directory: target_chains/starknet/contracts/
steps:
- uses: actions/checkout@v3
- name: Install Scarb
uses: software-mansion/setup-scarb@v1
with:
tool-versions: target_chains/starknet/contracts/.tool-versions
- name: Install Starknet Foundry
uses: foundry-rs/setup-snfoundry@v3
with:
tool-versions: target_chains/starknet/contracts/.tool-versions
- name: Install Starkli
run: curl https://get.starkli.sh | sh && . ~/.config/.starkli/env && starkliup -v $(awk '/starkli/{print $2}' .tool-versions)
- name: Install Katana
run: curl -L https://install.dojoengine.org | bash && PATH="$PATH:$HOME/.config/.dojo/bin" dojoup -v $(awk '/dojo/{print $2}' .tool-versions)
- name: Check formatting
run: scarb fmt --check
- name: Run tests
run: snforge test
- name: Test local deployment script
run: bash -c 'PATH="$PATH:$HOME/.config/.dojo/bin" katana & . ~/.config/.starkli/env && deploy/local_deploy'

View File

@ -12,7 +12,7 @@ jobs:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- uses: actions/setup-node@v2 - uses: actions/setup-node@v2
with: with:
node-version: "16" node-version: "18"
registry-url: "https://registry.npmjs.org" registry-url: "https://registry.npmjs.org"
- run: npm ci - run: npm ci
- run: npx lerna run build --no-private - run: npx lerna run build --no-private

View File

@ -11,8 +11,14 @@ jobs:
steps: steps:
- name: Checkout sources - name: Checkout sources
uses: actions/checkout@v2 uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
default: true
profile: minimal
- run: cargo publish --token ${CARGO_REGISTRY_TOKEN} - run: cargo +stable-x86_64-unknown-linux-gnu publish --token ${CARGO_REGISTRY_TOKEN}
env: env:
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }} CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}
working-directory: "target_chains/solana/pyth_solana_receiver_sdk" working-directory: "target_chains/solana/pyth_solana_receiver_sdk"

View File

@ -46,7 +46,7 @@ jobs:
uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4 uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4
with: with:
context: . context: .
file: "./fortuna/Dockerfile" file: "./apps/fortuna/Dockerfile"
push: true push: true
tags: ${{ steps.metadata_fortuna.outputs.tags }} tags: ${{ steps.metadata_fortuna.outputs.tags }}
labels: ${{ steps.metadata_fortuna.outputs.labels }} labels: ${{ steps.metadata_fortuna.outputs.labels }}

View File

@ -40,7 +40,7 @@ jobs:
id: ecr_login id: ecr_login
- run: | - run: |
DOCKER_BUILDKIT=1 docker build -t lerna -f Dockerfile.lerna . DOCKER_BUILDKIT=1 docker build -t lerna -f Dockerfile.lerna .
DOCKER_BUILDKIT=1 docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG -f price_pusher/Dockerfile . DOCKER_BUILDKIT=1 docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG -f apps/price_pusher/Dockerfile .
docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG
env: env:
ECR_REGISTRY: public.ecr.aws ECR_REGISTRY: public.ecr.aws

View File

@ -5,12 +5,17 @@ on:
tags: tags:
- "python-v*" - "python-v*"
env:
PYTHON_VERSION: "3.11"
jobs: jobs:
deploy: deploy:
runs-on: ubuntu-20.04 runs-on: ubuntu-20.04
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- uses: actions/setup-python@v2 - uses: actions/setup-python@v2
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install dependencies - name: Install dependencies
run: | run: |
python3 -m pip install --upgrade poetry python3 -m pip install --upgrade poetry

1
.npmrc Normal file
View File

@ -0,0 +1 @@
engine-strict=true

View File

@ -60,9 +60,9 @@ repos:
- id: cargo-fmt-fortuna - id: cargo-fmt-fortuna
name: Cargo format for Fortuna name: Cargo format for Fortuna
language: "rust" language: "rust"
entry: cargo +nightly-2023-07-23 fmt --manifest-path ./fortuna/Cargo.toml --all -- --config-path rustfmt.toml entry: cargo +nightly-2023-07-23 fmt --manifest-path ./apps/fortuna/Cargo.toml --all -- --config-path rustfmt.toml
pass_filenames: false pass_filenames: false
files: fortuna files: apps/fortuna
# Hooks for message buffer contract # Hooks for message buffer contract
- id: cargo-fmt-message-buffer - id: cargo-fmt-message-buffer
name: Cargo format for message buffer contract name: Cargo format for message buffer contract
@ -80,13 +80,13 @@ repos:
- id: cargo-fmt-pythnet-sdk - id: cargo-fmt-pythnet-sdk
name: Cargo format for pythnet SDK name: Cargo format for pythnet SDK
language: "rust" language: "rust"
entry: cargo +nightly-2023-07-23 fmt --manifest-path ./pythnet/pythnet_sdk/Cargo.toml --all -- --config-path rustfmt.toml entry: cargo +nightly-2024-03-26 fmt --manifest-path ./pythnet/pythnet_sdk/Cargo.toml --all -- --config-path rustfmt.toml
pass_filenames: false pass_filenames: false
files: pythnet/pythnet_sdk files: pythnet/pythnet_sdk
- id: cargo-clippy-pythnet-sdk - id: cargo-clippy-pythnet-sdk
name: Cargo clippy for pythnet SDK name: Cargo clippy for pythnet SDK
language: "rust" language: "rust"
entry: cargo +nightly-2023-07-23 clippy --manifest-path ./pythnet/pythnet_sdk/Cargo.toml --tests --fix --allow-dirty --allow-staged -- -D warnings entry: cargo +nightly-2024-03-26 clippy --manifest-path ./pythnet/pythnet_sdk/Cargo.toml --tests --fix --allow-dirty --allow-staged -- -D warnings
pass_filenames: false pass_filenames: false
files: pythnet/pythnet_sdk files: pythnet/pythnet_sdk
# Hooks for solana receiver contract # Hooks for solana receiver contract

View File

@ -79,10 +79,11 @@ Lerna has some common failure modes that you may encounter:
1. `npm ci` fails with a typescript compilation error about a missing package. 1. `npm ci` fails with a typescript compilation error about a missing package.
This error likely means that the failing package has a `prepare` entry compiling the typescript in its `package.json`. This error likely means that the failing package has a `prepare` entry compiling the typescript in its `package.json`.
Fix this error by moving that logic to the `prepublishOnly` entry. Fix this error by moving that logic to the `prepublishOnly` entry.
1. The software builds locally but fails in CI, or vice-versa. 2. The software builds locally but fails in CI, or vice-versa.
This error likely means that some local build caches need to be cleaned. This error likely means that some local build caches need to be cleaned.
The build error may not indicate that this is a caching issue, e.g., it may appear that the packages are being built in the wrong order. The build error may not indicate that this is a caching issue, e.g., it may appear that the packages are being built in the wrong order.
Delete `node_modules/`, `lib/` and `tsconfig.tsbuildinfo` from each package's subdirectory. then try again. Delete `node_modules/`, `lib/` and `tsconfig.tsbuildinfo` from each package's subdirectory. then try again.
3. `npm ci` fails due to wrong node version. Make sure to be using `v18`. Node version `v21` is not supported and known to cause issues.
## Audit / Feature Status ## Audit / Feature Status

View File

@ -1,4 +1,4 @@
/target /target
config.yaml *config.yaml
*secret* *secret*
*private-key* *private-key*

View File

@ -1488,7 +1488,7 @@ dependencies = [
[[package]] [[package]]
name = "fortuna" name = "fortuna"
version = "4.0.0" version = "5.2.2"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"axum", "axum",
@ -2822,7 +2822,7 @@ dependencies = [
[[package]] [[package]]
name = "pythnet-sdk" name = "pythnet-sdk"
version = "2.0.0" version = "2.1.0"
dependencies = [ dependencies = [
"bincode", "bincode",
"borsh", "borsh",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "fortuna" name = "fortuna"
version = "4.0.0" version = "5.2.2"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
@ -14,9 +14,9 @@ clap = { version = "4.4.6", features = ["derive", "cargo", "env"] }
ethabi = "18.0.0" ethabi = "18.0.0"
ethers = { version = "2.0.14", features = ["ws"] } ethers = { version = "2.0.14", features = ["ws"] }
futures = { version = "0.3.28" } futures = { version = "0.3.28" }
hex = "0.4.3" hex = "0.4.3"
prometheus-client = { version = "0.21.2" } prometheus-client = { version = "0.21.2" }
pythnet-sdk = { path = "../pythnet/pythnet_sdk", features = ["strum"] } pythnet-sdk = { path = "../../pythnet/pythnet_sdk", features = ["strum"] }
rand = "0.8.5" rand = "0.8.5"
reqwest = { version = "0.11.22", features = ["json", "blocking"] } reqwest = { version = "0.11.22", features = ["json", "blocking"] }
serde = { version = "1.0.188", features = ["derive"] } serde = { version = "1.0.188", features = ["derive"] }

View File

@ -7,15 +7,15 @@ RUN rustup default nightly-2023-07-23
# Build # Build
WORKDIR /src WORKDIR /src
COPY fortuna fortuna COPY apps/fortuna apps/fortuna
COPY pythnet pythnet COPY pythnet pythnet
COPY target_chains/ethereum/entropy_sdk/solidity/abis target_chains/ethereum/entropy_sdk/solidity/abis COPY target_chains/ethereum/entropy_sdk/solidity/abis target_chains/ethereum/entropy_sdk/solidity/abis
WORKDIR /src/fortuna WORKDIR /src/apps/fortuna
RUN --mount=type=cache,target=/root/.cargo/registry cargo build --release RUN --mount=type=cache,target=/root/.cargo/registry cargo build --release
FROM rust:${RUST_VERSION} FROM rust:${RUST_VERSION}
# Copy artifacts from other images # Copy artifacts from other images
COPY --from=build /src/fortuna/target/release/fortuna /usr/local/bin/ COPY --from=build /src/apps/fortuna/target/release/fortuna /usr/local/bin/

View File

@ -4,3 +4,4 @@ chains:
contract_addr: 0x8250f4aF4B972684F7b336503E2D6dFeDeB1487a contract_addr: 0x8250f4aF4B972684F7b336503E2D6dFeDeB1487a
reveal_delay_blocks: 0 reveal_delay_blocks: 0
legacy_tx: true legacy_tx: true
gas_limit: 500000

View File

@ -0,0 +1,7 @@
chains:
lightlink-pegasus:
commitments:
# prettier-ignore
- seed: [219,125,217,197,234,88,208,120,21,181,172,143,239,102,41,233,167,212,237,106,37,255,184,165,238,121,230,155,116,158,173,48]
chain_length: 10000
original_commitment_sequence_number: 104

View File

@ -0,0 +1 @@
nightly-2023-07-23

View File

@ -59,7 +59,7 @@ use {
// contract in the same repo. // contract in the same repo.
abigen!( abigen!(
PythRandom, PythRandom,
"../target_chains/ethereum/entropy_sdk/solidity/abis/IEntropy.json" "../../target_chains/ethereum/entropy_sdk/solidity/abis/IEntropy.json"
); );
pub type SignablePythContract = PythRandom< pub type SignablePythContract = PythRandom<
@ -277,7 +277,11 @@ impl EntropyReader for PythContract {
Err(e) => match e { Err(e) => match e {
ContractError::ProviderError { e } => Err(anyhow!(e)), ContractError::ProviderError { e } => Err(anyhow!(e)),
_ => { _ => {
tracing::info!("Gas estimation for reveal with callback failed: {:?}", e); tracing::info!(
sequence_number = sequence_number,
"Gas estimation failed. error: {:?}",
e
);
Ok(None) Ok(None)
} }
}, },

View File

@ -8,7 +8,9 @@ use {
chain::ethereum::PythContract, chain::ethereum::PythContract,
command::register_provider::CommitmentMetadata, command::register_provider::CommitmentMetadata,
config::{ config::{
Commitment,
Config, Config,
ProviderConfig,
RunOptions, RunOptions,
}, },
keeper, keeper,
@ -27,7 +29,6 @@ use {
collections::HashMap, collections::HashMap,
net::SocketAddr, net::SocketAddr,
sync::Arc, sync::Arc,
vec,
}, },
tokio::{ tokio::{
spawn, spawn,
@ -121,38 +122,67 @@ pub async fn run_keeper(
pub async fn run(opts: &RunOptions) -> Result<()> { pub async fn run(opts: &RunOptions) -> Result<()> {
let config = Config::load(&opts.config.config)?; let config = Config::load(&opts.config.config)?;
let private_key = opts.load_private_key()?; let provider_config = opts
.provider_config
.provider_config
.as_ref()
.map(|path| ProviderConfig::load(&path).expect("Failed to load provider config"));
let secret = opts.randomness.load_secret()?; let secret = opts.randomness.load_secret()?;
let (tx_exit, rx_exit) = watch::channel(false); let (tx_exit, rx_exit) = watch::channel(false);
let mut chains: HashMap<ChainId, BlockchainState> = HashMap::new(); let mut chains: HashMap<ChainId, BlockchainState> = HashMap::new();
for (chain_id, chain_config) in &config.chains { for (chain_id, chain_config) in &config.chains {
let contract = Arc::new(PythContract::from_config(&chain_config)?); let contract = Arc::new(PythContract::from_config(&chain_config)?);
let provider_info = contract.get_provider_info(opts.provider).call().await?; let provider_chain_config = provider_config
.as_ref()
.and_then(|c| c.get_chain_config(chain_id));
let mut provider_commitments = provider_chain_config
.as_ref()
.map(|c| c.get_sorted_commitments())
.unwrap_or_else(|| Vec::new());
let provider_info = contract.get_provider_info(opts.provider).call().await?;
let latest_metadata =
bincode::deserialize::<CommitmentMetadata>(&provider_info.commitment_metadata)
.map_err(|e| {
anyhow!(
"Chain: {} - Failed to deserialize commitment metadata: {}",
&chain_id,
e
)
})?;
provider_commitments.push(Commitment {
seed: latest_metadata.seed,
chain_length: latest_metadata.chain_length,
original_commitment_sequence_number: provider_info.original_commitment_sequence_number,
});
// Reconstruct the hash chain based on the metadata and check that it matches the on-chain commitment.
// TODO: we should instantiate the state here with multiple hash chains.
// This approach works fine as long as we haven't rotated the commitment (i.e., all user requests
// are for the most recent chain).
// TODO: we may want to load the hash chain in a lazy/fault-tolerant way. If there are many blockchains, // TODO: we may want to load the hash chain in a lazy/fault-tolerant way. If there are many blockchains,
// then it's more likely that some RPC fails. We should tolerate these faults and generate the hash chain // then it's more likely that some RPC fails. We should tolerate these faults and generate the hash chain
// later when a user request comes in for that chain. // later when a user request comes in for that chain.
let metadata =
bincode::deserialize::<CommitmentMetadata>(&provider_info.commitment_metadata)?;
let hash_chain = PebbleHashChain::from_config( let mut offsets = Vec::<usize>::new();
&secret, let mut hash_chains = Vec::<PebbleHashChain>::new();
&chain_id,
&opts.provider, for commitment in &provider_commitments {
&chain_config.contract_addr, let offset = commitment.original_commitment_sequence_number.try_into()?;
&metadata.seed, offsets.push(offset);
metadata.chain_length,
)?; let pebble_hash_chain = PebbleHashChain::from_config(
&secret,
&chain_id,
&opts.provider,
&chain_config.contract_addr,
&commitment.seed,
commitment.chain_length,
)?;
hash_chains.push(pebble_hash_chain);
}
let chain_state = HashChainState { let chain_state = HashChainState {
offsets: vec![provider_info offsets,
.original_commitment_sequence_number hash_chains,
.try_into()?],
hash_chains: vec![hash_chain],
}; };
if chain_state.reveal(provider_info.original_commitment_sequence_number)? if chain_state.reveal(provider_info.original_commitment_sequence_number)?
@ -187,7 +217,10 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
Ok::<(), Error>(()) Ok::<(), Error>(())
}); });
spawn(run_keeper(chains.clone(), config, private_key));
if let Some(keeper_private_key) = opts.load_keeper_private_key()? {
spawn(run_keeper(chains.clone(), config, keeper_private_key));
}
run_api(opts.addr.clone(), chains, rx_exit).await?; run_api(opts.addr.clone(), chains, rx_exit).await?;

View File

@ -16,7 +16,10 @@ use {
PebbleHashChain, PebbleHashChain,
}, },
}, },
anyhow::Result, anyhow::{
anyhow,
Result,
},
ethers::{ ethers::{
abi::Bytes as AbiBytes, abi::Bytes as AbiBytes,
signers::{ signers::{
@ -66,7 +69,14 @@ pub async fn setup_provider(opts: &SetupProviderOptions) -> Result<()> {
register = true; register = true;
} else { } else {
let metadata = let metadata =
bincode::deserialize::<CommitmentMetadata>(&provider_info.commitment_metadata)?; bincode::deserialize::<CommitmentMetadata>(&provider_info.commitment_metadata)
.map_err(|e| {
anyhow!(
"Chain: {} - Failed to deserialize commitment metadata: {}",
&chain_id,
e
)
})?;
let hash_chain = PebbleHashChain::from_config( let hash_chain = PebbleHashChain::from_config(
&secret, &secret,
@ -74,7 +84,7 @@ pub async fn setup_provider(opts: &SetupProviderOptions) -> Result<()> {
&provider_address, &provider_address,
&chain_config.contract_addr, &chain_config.contract_addr,
&metadata.seed, &metadata.seed,
metadata.chain_length, opts.randomness.chain_length,
)?; )?;
let chain_state = HashChainState { let chain_state = HashChainState {
offsets: vec![provider_info offsets: vec![provider_info
@ -105,7 +115,8 @@ pub async fn setup_provider(opts: &SetupProviderOptions) -> Result<()> {
fee: opts.fee, fee: opts.fee,
uri, uri,
}) })
.await?; .await
.map_err(|e| anyhow!("Chain: {} - Failed to register provider: {}", &chain_id, e))?;
tracing::info!("{}: registered", &chain_id); tracing::info!("{}: registered", &chain_id);
} else { } else {
if provider_info.fee_in_wei != opts.fee { if provider_info.fee_in_wei != opts.fee {

View File

@ -97,7 +97,7 @@ pub struct RandomnessOptions {
/// The length of the hash chain to generate. /// The length of the hash chain to generate.
#[arg(long = "chain-length")] #[arg(long = "chain-length")]
#[arg(env = "FORTUNA_CHAIN_LENGTH")] #[arg(env = "FORTUNA_CHAIN_LENGTH")]
#[arg(default_value = "10000")] #[arg(default_value = "100000")]
pub chain_length: u64, pub chain_length: u64,
} }
@ -158,3 +158,57 @@ pub struct EthereumConfig {
/// The gas limit to use for entropy callback transactions. /// The gas limit to use for entropy callback transactions.
pub gas_limit: U256, pub gas_limit: U256,
} }
#[derive(Args, Clone, Debug)]
#[command(next_help_heading = "Provider Config Options")]
#[group(id = "ProviderConfig")]
pub struct ProviderConfigOptions {
#[arg(long = "provider-config")]
#[arg(env = "FORTUNA_PROVIDER_CONFIG")]
pub provider_config: Option<String>,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct ProviderConfig {
pub chains: HashMap<ChainId, ProviderChainConfig>,
}
impl ProviderConfig {
pub fn load(path: &str) -> Result<ProviderConfig> {
// Open and read the YAML file
let yaml_content = fs::read_to_string(path)?;
let config: ProviderConfig = serde_yaml::from_str(&yaml_content)?;
Ok(config)
}
/// Get the provider chain config. The method returns an Option for ProviderChainConfig.
/// We may not have past any commitments for a chain. For example, for a new chain
pub fn get_chain_config(&self, chain_id: &ChainId) -> Option<ProviderChainConfig> {
self.chains.get(chain_id).map(|x| x.clone())
}
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct ProviderChainConfig {
commitments: Vec<Commitment>,
}
impl ProviderChainConfig {
/// Returns a clone of the commitments in the sorted order.
/// `HashChainState` requires offsets to be in order.
pub fn get_sorted_commitments(&self) -> Vec<Commitment> {
let mut sorted_commitments = self.commitments.clone();
sorted_commitments.sort_by(|c1, c2| {
c1.original_commitment_sequence_number
.cmp(&c2.original_commitment_sequence_number)
});
sorted_commitments
}
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct Commitment {
pub seed: [u8; 32],
pub chain_length: u64,
pub original_commitment_sequence_number: u64,
}

View File

@ -1,6 +1,7 @@
use { use {
crate::config::{ crate::config::{
ConfigOptions, ConfigOptions,
ProviderConfigOptions,
RandomnessOptions, RandomnessOptions,
}, },
anyhow::Result, anyhow::Result,
@ -18,6 +19,9 @@ pub struct RunOptions {
#[command(flatten)] #[command(flatten)]
pub config: ConfigOptions, pub config: ConfigOptions,
#[command(flatten)]
pub provider_config: ProviderConfigOptions,
#[command(flatten)] #[command(flatten)]
pub randomness: RandomnessOptions, pub randomness: RandomnessOptions,
@ -32,16 +36,20 @@ pub struct RunOptions {
#[arg(env = "FORTUNA_PROVIDER")] #[arg(env = "FORTUNA_PROVIDER")]
pub provider: Address, pub provider: Address,
/// Path to a file containing a 20-byte (40 char) hex encoded Ethereum private key. /// If provided, the keeper will run alongside the Fortuna API service.
/// It should be a path to a file containing a 20-byte (40 char) hex encoded Ethereum private key.
/// This key is required to submit transactions for entropy callback requests. /// This key is required to submit transactions for entropy callback requests.
/// This key should not be a registered provider. /// This key should not be a registered provider.
#[arg(long = "keeper-private-key")] #[arg(long = "keeper-private-key")]
#[arg(env = "KEEPER_PRIVATE_KEY")] #[arg(env = "KEEPER_PRIVATE_KEY")]
pub keeper_private_key_file: String, pub keeper_private_key_file: Option<String>,
} }
impl RunOptions { impl RunOptions {
pub fn load_private_key(&self) -> Result<String> { pub fn load_keeper_private_key(&self) -> Result<Option<String>> {
return Ok((fs::read_to_string(&self.keeper_private_key_file))?); if let Some(ref keeper_private_key_file) = self.keeper_private_key_file {
return Ok(Some(fs::read_to_string(keeper_private_key_file)?));
}
return Ok(None);
} }
} }

487
apps/fortuna/src/keeper.rs Normal file
View File

@ -0,0 +1,487 @@
use {
crate::{
api::{
self,
BlockchainState,
},
chain::{
ethereum::SignablePythContract,
reader::{
BlockNumber,
RequestedWithCallbackEvent,
},
},
config::EthereumConfig,
},
anyhow::{
anyhow,
Result,
},
ethers::{
contract::ContractError,
providers::{
Middleware,
Provider,
Ws,
},
types::U256,
},
futures::StreamExt,
std::sync::Arc,
tokio::{
spawn,
sync::mpsc,
time::{
self,
Duration,
},
},
tracing::{
self,
Instrument,
},
};
#[derive(Debug)]
pub struct BlockRange {
pub from: BlockNumber,
pub to: BlockNumber,
}
/// How much to wait before retrying in case of an RPC error
const RETRY_INTERVAL: Duration = Duration::from_secs(5);
/// How many blocks to look back for events that might be missed when starting the keeper
const BACKLOG_RANGE: u64 = 1000;
/// How many blocks to fetch events for in a single rpc call
const BLOCK_BATCH_SIZE: u64 = 100;
/// How much to wait before polling the next latest block
const POLL_INTERVAL: Duration = Duration::from_secs(5);
/// Get the latest safe block number for the chain. Retry internally if there is an error.
async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
loop {
match chain_state
.contract
.get_block_number(chain_state.confirmed_block_status)
.await
{
Ok(latest_confirmed_block) => {
tracing::info!(
"Fetched latest safe block {}",
latest_confirmed_block - chain_state.reveal_delay_blocks
);
return latest_confirmed_block - chain_state.reveal_delay_blocks;
}
Err(e) => {
tracing::error!("Error while getting block number. error: {:?}", e);
time::sleep(RETRY_INTERVAL).await;
}
}
}
}
/// Run threads to handle events for the last `BACKLOG_RANGE` blocks, watch for new blocks and
/// handle any events for the new blocks.
#[tracing::instrument(name="keeper", skip_all, fields(chain_id=chain_state.id))]
pub async fn run_keeper_threads(
private_key: String,
chain_eth_config: EthereumConfig,
chain_state: BlockchainState,
) {
tracing::info!("starting keeper");
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
tracing::info!("latest safe block: {}", &latest_safe_block);
let contract = Arc::new(
SignablePythContract::from_config(&chain_eth_config, &private_key)
.await
.expect("Chain config should be valid"),
);
// Spawn a thread to handle the events from last BACKLOG_RANGE blocks.
spawn(
process_backlog(
BlockRange {
from: latest_safe_block.saturating_sub(BACKLOG_RANGE),
to: latest_safe_block,
},
contract.clone(),
chain_eth_config.gas_limit,
chain_state.clone(),
)
.in_current_span(),
);
let (tx, rx) = mpsc::channel::<BlockRange>(1000);
// Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel.
spawn(
watch_blocks_wrapper(
chain_state.clone(),
latest_safe_block,
tx,
chain_eth_config.geth_rpc_wss.clone(),
)
.in_current_span(),
);
// Spawn a thread that listens for block ranges on the `rx` channel and processes the events for those blocks.
spawn(
process_new_blocks(
chain_state.clone(),
rx,
Arc::clone(&contract),
chain_eth_config.gas_limit,
)
.in_current_span(),
);
}
/// Process an event for a chain. It estimates the gas for the reveal with callback and
/// submits the transaction if the gas estimate is below the gas limit.
/// It will return an Error if the gas estimation failed with a provider error or if the
/// reveal with callback failed with a provider error.
pub async fn process_event(
event: RequestedWithCallbackEvent,
chain_config: &BlockchainState,
contract: &Arc<SignablePythContract>,
gas_limit: U256,
) -> Result<()> {
if chain_config.provider_address != event.provider_address {
return Ok(());
}
let provider_revelation = match chain_config.state.reveal(event.sequence_number) {
Ok(result) => result,
Err(e) => {
tracing::error!(
sequence_number = &event.sequence_number,
"Error while revealing with error: {:?}",
e
);
return Ok(());
}
};
let gas_estimate_res = chain_config
.contract
.estimate_reveal_with_callback_gas(
event.provider_address,
event.sequence_number,
event.user_random_number,
provider_revelation,
)
.in_current_span()
.await;
match gas_estimate_res {
Ok(gas_estimate_option) => match gas_estimate_option {
Some(gas_estimate) => {
// Pad the gas estimate by 33%
let (gas_estimate, _) = gas_estimate
.saturating_mul(U256::from(4))
.div_mod(U256::from(3));
if gas_estimate > gas_limit {
tracing::error!(
sequence_number = &event.sequence_number,
"Gas estimate for reveal with callback is higher than the gas limit"
);
return Ok(());
}
let contract_call = contract
.reveal_with_callback(
event.provider_address,
event.sequence_number,
event.user_random_number,
provider_revelation,
)
.gas(gas_estimate);
let res = contract_call.send().await;
let pending_tx = match res {
Ok(pending_tx) => pending_tx,
Err(e) => match e {
// If there is a provider error, we weren't able to send the transaction.
// We will return an error. So, that the caller can decide what to do (retry).
ContractError::ProviderError { e } => return Err(e.into()),
// For all the other errors, it is likely the case we won't be able to reveal for
// ever. We will return an Ok(()) to signal that we have processed this reveal
// and concluded that its Ok to not reveal.
_ => {
tracing::error!(
sequence_number = &event.sequence_number,
"Error while revealing with error: {:?}",
e
);
return Ok(());
}
},
};
match pending_tx.await {
Ok(res) => {
tracing::info!(
sequence_number = &event.sequence_number,
"Revealed with res: {:?}",
res
);
Ok(())
}
Err(e) => {
tracing::error!(
sequence_number = &event.sequence_number,
"Error while revealing with error: {:?}",
e
);
Err(e.into())
}
}
}
None => {
tracing::info!(
sequence_number = &event.sequence_number,
"Not processing event"
);
Ok(())
}
},
Err(e) => {
tracing::error!(
sequence_number = &event.sequence_number,
"Error while simulating reveal with error: {:?}",
e
);
Err(e)
}
}
}
/// Process a range of blocks in batches. It calls the `process_single_block_batch` method for each batch.
#[tracing::instrument(skip_all, fields(range_from_block=block_range.from, range_to_block=block_range.to))]
pub async fn process_block_range(
block_range: BlockRange,
contract: Arc<SignablePythContract>,
gas_limit: U256,
chain_state: api::BlockchainState,
) {
let BlockRange {
from: first_block,
to: last_block,
} = block_range;
let mut current_block = first_block;
while current_block <= last_block {
let mut to_block = current_block + BLOCK_BATCH_SIZE;
if to_block > last_block {
to_block = last_block;
}
process_single_block_batch(
BlockRange {
from: current_block,
to: to_block,
},
contract.clone(),
gas_limit,
chain_state.clone(),
)
.in_current_span()
.await;
current_block = to_block + 1;
}
}
/// Process a batch of blocks for a chain. It will fetch events for all the blocks in a single call for the provided batch
/// and then try to process them one by one. If the process fails, it will retry indefinitely.
#[tracing::instrument(name="batch", skip_all, fields(batch_from_block=block_range.from, batch_to_block=block_range.to))]
pub async fn process_single_block_batch(
block_range: BlockRange,
contract: Arc<SignablePythContract>,
gas_limit: U256,
chain_state: api::BlockchainState,
) {
loop {
let events_res = chain_state
.contract
.get_request_with_callback_events(block_range.from, block_range.to)
.await;
match events_res {
Ok(events) => {
tracing::info!(num_of_events = &events.len(), "Processing",);
for event in &events {
tracing::info!(sequence_number = &event.sequence_number, "Processing event",);
while let Err(e) =
process_event(event.clone(), &chain_state, &contract, gas_limit)
.in_current_span()
.await
{
tracing::error!(
sequence_number = &event.sequence_number,
"Error while processing event. Waiting for {} seconds before retry. error: {:?}",
RETRY_INTERVAL.as_secs(),
e
);
time::sleep(RETRY_INTERVAL).await;
}
tracing::info!(sequence_number = &event.sequence_number, "Processed event",);
}
tracing::info!(num_of_events = &events.len(), "Processed",);
break;
}
Err(e) => {
tracing::error!(
"Error while getting events. Waiting for {} seconds before retry. error: {:?}",
RETRY_INTERVAL.as_secs(),
e
);
time::sleep(RETRY_INTERVAL).await;
}
}
}
}
/// Wrapper for the `watch_blocks` method. If there was an error while watching, it will retry after a delay.
/// It retries indefinitely.
#[tracing::instrument(name="watch_blocks", skip_all, fields(initial_safe_block=latest_safe_block))]
pub async fn watch_blocks_wrapper(
chain_state: BlockchainState,
latest_safe_block: BlockNumber,
tx: mpsc::Sender<BlockRange>,
geth_rpc_wss: Option<String>,
) {
let mut last_safe_block_processed = latest_safe_block;
loop {
if let Err(e) = watch_blocks(
chain_state.clone(),
&mut last_safe_block_processed,
tx.clone(),
geth_rpc_wss.clone(),
)
.in_current_span()
.await
{
tracing::error!("watching blocks. error: {:?}", e);
time::sleep(RETRY_INTERVAL).await;
}
}
}
/// Watch for new blocks and send the range of blocks for which events have not been handled to the `tx` channel.
/// We are subscribing to new blocks instead of events. If we miss some blocks, it will be fine as we are sending
/// block ranges to the `tx` channel. If we have subscribed to events, we could have missed those and won't even
/// know about it.
pub async fn watch_blocks(
chain_state: BlockchainState,
last_safe_block_processed: &mut BlockNumber,
tx: mpsc::Sender<BlockRange>,
geth_rpc_wss: Option<String>,
) -> Result<()> {
tracing::info!("Watching blocks to handle new events");
let provider_option = match geth_rpc_wss {
Some(wss) => Some(match Provider::<Ws>::connect(wss.clone()).await {
Ok(provider) => provider,
Err(e) => {
tracing::error!("Error while connecting to wss: {}. error: {:?}", wss, e);
return Err(e.into());
}
}),
None => {
tracing::info!("No wss provided");
None
}
};
let mut stream_option = match provider_option {
Some(ref provider) => Some(match provider.subscribe_blocks().await {
Ok(client) => client,
Err(e) => {
tracing::error!("Error while subscribing to blocks. error {:?}", e);
return Err(e.into());
}
}),
None => None,
};
loop {
match stream_option {
Some(ref mut stream) => {
if let None = stream.next().await {
tracing::error!("Error blocks subscription stream ended");
return Err(anyhow!("Error blocks subscription stream ended"));
}
}
None => {
time::sleep(POLL_INTERVAL).await;
}
}
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
if latest_safe_block > *last_safe_block_processed {
match tx
.send(BlockRange {
from: *last_safe_block_processed + 1,
to: latest_safe_block,
})
.await
{
Ok(_) => {
tracing::info!(
from_block = *last_safe_block_processed + 1,
to_block = &latest_safe_block,
"Block range sent to handle events",
);
*last_safe_block_processed = latest_safe_block;
}
Err(e) => {
tracing::error!(
"Error while sending block range to handle events. These will be handled in next call. error: {:?}",
e
);
}
};
}
}
}
/// It waits on rx channel to receive block ranges and then calls process_block_range to process them.
#[tracing::instrument(skip_all)]
pub async fn process_new_blocks(
chain_state: BlockchainState,
mut rx: mpsc::Receiver<BlockRange>,
contract: Arc<SignablePythContract>,
gas_limit: U256,
) {
tracing::info!("Waiting for new block ranges to process");
loop {
if let Some(block_range) = rx.recv().await {
process_block_range(
block_range,
Arc::clone(&contract),
gas_limit,
chain_state.clone(),
)
.in_current_span()
.await;
}
}
}
/// Processes the backlog_range for a chain.
#[tracing::instrument(skip_all)]
pub async fn process_backlog(
backlog_range: BlockRange,
contract: Arc<SignablePythContract>,
gas_limit: U256,
chain_state: BlockchainState,
) {
tracing::info!("Processing backlog");
process_block_range(backlog_range, contract, gas_limit, chain_state)
.in_current_span()
.await;
tracing::info!("Backlog processed");
}

View File

@ -1796,7 +1796,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]] [[package]]
name = "hermes" name = "hermes"
version = "0.5.5" version = "0.5.9"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@ -3138,7 +3138,7 @@ dependencies = [
[[package]] [[package]]
name = "pythnet-sdk" name = "pythnet-sdk"
version = "2.0.0" version = "2.1.0"
dependencies = [ dependencies = [
"bincode", "bincode",
"borsh 0.10.3", "borsh 0.10.3",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "hermes" name = "hermes"
version = "0.5.5" version = "0.5.9"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021" edition = "2021"

View File

@ -1,6 +1,5 @@
use { use {
crate::{ crate::{
aggregate::AggregationEvent,
config::RunOptions, config::RunOptions,
state::State, state::State,
}, },
@ -14,7 +13,6 @@ use {
ipnet::IpNet, ipnet::IpNet,
serde_qs::axum::QsQueryConfig, serde_qs::axum::QsQueryConfig,
std::sync::Arc, std::sync::Arc,
tokio::sync::broadcast::Sender,
tower_http::cors::CorsLayer, tower_http::cors::CorsLayer,
utoipa::OpenApi, utoipa::OpenApi,
utoipa_swagger_ui::SwaggerUi, utoipa_swagger_ui::SwaggerUi,
@ -27,10 +25,9 @@ pub mod types;
mod ws; mod ws;
pub struct ApiState<S = State> { pub struct ApiState<S = State> {
pub state: Arc<S>, pub state: Arc<S>,
pub ws: Arc<ws::WsState>, pub ws: Arc<ws::WsState>,
pub metrics: Arc<metrics_middleware::Metrics>, pub metrics: Arc<metrics_middleware::Metrics>,
pub update_tx: Sender<AggregationEvent>,
} }
/// Manually implement `Clone` as the derive macro will try and slap `Clone` on /// Manually implement `Clone` as the derive macro will try and slap `Clone` on
@ -38,10 +35,9 @@ pub struct ApiState<S = State> {
impl<S> Clone for ApiState<S> { impl<S> Clone for ApiState<S> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
state: self.state.clone(), state: self.state.clone(),
ws: self.ws.clone(), ws: self.ws.clone(),
metrics: self.metrics.clone(), metrics: self.metrics.clone(),
update_tx: self.update_tx.clone(),
} }
} }
} }
@ -51,7 +47,6 @@ impl ApiState<State> {
state: Arc<State>, state: Arc<State>,
ws_whitelist: Vec<IpNet>, ws_whitelist: Vec<IpNet>,
requester_ip_header_name: String, requester_ip_header_name: String,
update_tx: Sender<AggregationEvent>,
) -> Self { ) -> Self {
Self { Self {
metrics: Arc::new(metrics_middleware::Metrics::new(state.clone())), metrics: Arc::new(metrics_middleware::Metrics::new(state.clone())),
@ -61,24 +56,18 @@ impl ApiState<State> {
state.clone(), state.clone(),
)), )),
state, state,
update_tx,
} }
} }
} }
#[tracing::instrument(skip(opts, state, update_tx))] #[tracing::instrument(skip(opts, state))]
pub async fn spawn( pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> {
opts: RunOptions,
state: Arc<State>,
update_tx: Sender<AggregationEvent>,
) -> Result<()> {
let state = { let state = {
let opts = opts.clone(); let opts = opts.clone();
ApiState::new( ApiState::new(
state, state,
opts.rpc.ws_whitelist, opts.rpc.ws_whitelist,
opts.rpc.requester_ip_header_name, opts.rpc.requester_ip_header_name,
update_tx,
) )
}; };
@ -135,6 +124,7 @@ pub async fn run(opts: RunOptions, state: ApiState) -> Result<()> {
// Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the // Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the
// `with_state` method which replaces `Body` with `State` in the type signature. // `with_state` method which replaces `Body` with `State` in the type signature.
let app = Router::new(); let app = Router::new();
#[allow(deprecated)]
let app = app let app = app
.merge(SwaggerUi::new("/docs").url("/docs/openapi.json", ApiDoc::openapi())) .merge(SwaggerUi::new("/docs").url("/docs/openapi.json", ApiDoc::openapi()))
.route("/", get(rest::index)) .route("/", get(rest::index))

View File

@ -1,4 +1,4 @@
use crate::aggregate::UnixTimestamp; use crate::state::aggregate::UnixTimestamp;
// Example values for the utoipa API docs. // Example values for the utoipa API docs.
// Note that each of these expressions is only evaluated once when the documentation is created, // Note that each of these expressions is only evaluated once when the documentation is created,

View File

@ -1,5 +1,6 @@
use { use {
super::ApiState, super::ApiState,
crate::state::aggregate::Aggregates,
axum::{ axum::{
http::StatusCode, http::StatusCode,
response::{ response::{
@ -93,11 +94,15 @@ impl IntoResponse for RestError {
} }
/// Verify that the price ids exist in the aggregate state. /// Verify that the price ids exist in the aggregate state.
pub async fn verify_price_ids_exist( pub async fn verify_price_ids_exist<S>(
state: &ApiState, state: &ApiState<S>,
price_ids: &[PriceIdentifier], price_ids: &[PriceIdentifier],
) -> Result<(), RestError> { ) -> Result<(), RestError>
let all_ids = crate::aggregate::get_price_feed_ids(&*state.state).await; where
S: Aggregates,
{
let state = &*state.state;
let all_ids = Aggregates::get_price_feed_ids(state).await;
let missing_ids = price_ids let missing_ids = price_ids
.iter() .iter()
.filter(|id| !all_ids.contains(id)) .filter(|id| !all_ids.contains(id))

View File

@ -1,10 +1,6 @@
use { use {
super::verify_price_ids_exist, super::verify_price_ids_exist,
crate::{ crate::{
aggregate::{
RequestTime,
UnixTimestamp,
},
api::{ api::{
doc_examples, doc_examples,
rest::RestError, rest::RestError,
@ -12,6 +8,12 @@ use {
PriceIdInput, PriceIdInput,
RpcPriceFeed, RpcPriceFeed,
}, },
ApiState,
},
state::aggregate::{
Aggregates,
RequestTime,
UnixTimestamp,
}, },
}, },
anyhow::Result, anyhow::Result,
@ -47,6 +49,8 @@ pub struct GetPriceFeedQueryParams {
binary: bool, binary: bool,
} }
/// **Deprecated: use /v2/updates/price/{publish_time} instead**
///
/// Get a price update for a price feed with a specific timestamp /// Get a price update for a price feed with a specific timestamp
/// ///
/// Given a price feed id and timestamp, retrieve the Pyth price update closest to that timestamp. /// Given a price feed id and timestamp, retrieve the Pyth price update closest to that timestamp.
@ -60,16 +64,20 @@ pub struct GetPriceFeedQueryParams {
GetPriceFeedQueryParams GetPriceFeedQueryParams
) )
)] )]
pub async fn get_price_feed( #[deprecated]
State(state): State<crate::api::ApiState>, pub async fn get_price_feed<S>(
State(state): State<ApiState<S>>,
QsQuery(params): QsQuery<GetPriceFeedQueryParams>, QsQuery(params): QsQuery<GetPriceFeedQueryParams>,
) -> Result<Json<RpcPriceFeed>, RestError> { ) -> Result<Json<RpcPriceFeed>, RestError>
where
S: Aggregates,
{
let price_id: PriceIdentifier = params.id.into(); let price_id: PriceIdentifier = params.id.into();
verify_price_ids_exist(&state, &[price_id]).await?; verify_price_ids_exist(&state, &[price_id]).await?;
let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data( let state = &*state.state;
&*state.state, let price_feeds_with_update_data = Aggregates::get_price_feeds_with_update_data(
state,
&[price_id], &[price_id],
RequestTime::FirstAfter(params.publish_time), RequestTime::FirstAfter(params.publish_time),
) )

View File

@ -1,15 +1,16 @@
use { use {
super::verify_price_ids_exist, super::verify_price_ids_exist,
crate::{ crate::{
aggregate::{
get_price_feeds_with_update_data,
RequestTime,
UnixTimestamp,
},
api::{ api::{
doc_examples, doc_examples,
rest::RestError, rest::RestError,
types::PriceIdInput, types::PriceIdInput,
ApiState,
},
state::aggregate::{
Aggregates,
RequestTime,
UnixTimestamp,
}, },
}, },
anyhow::Result, anyhow::Result,
@ -54,6 +55,8 @@ pub struct GetVaaResponse {
publish_time: UnixTimestamp, publish_time: UnixTimestamp,
} }
/// **Deprecated: use /v2/updates/price/{publish_time} instead**
///
/// Get a VAA for a price feed with a specific timestamp /// Get a VAA for a price feed with a specific timestamp
/// ///
/// Given a price feed id and timestamp, retrieve the Pyth price update closest to that timestamp. /// Given a price feed id and timestamp, retrieve the Pyth price update closest to that timestamp.
@ -68,16 +71,20 @@ pub struct GetVaaResponse {
GetVaaQueryParams GetVaaQueryParams
) )
)] )]
pub async fn get_vaa( #[deprecated]
State(state): State<crate::api::ApiState>, pub async fn get_vaa<S>(
State(state): State<ApiState<S>>,
QsQuery(params): QsQuery<GetVaaQueryParams>, QsQuery(params): QsQuery<GetVaaQueryParams>,
) -> Result<Json<GetVaaResponse>, RestError> { ) -> Result<Json<GetVaaResponse>, RestError>
where
S: Aggregates,
{
let price_id: PriceIdentifier = params.id.into(); let price_id: PriceIdentifier = params.id.into();
verify_price_ids_exist(&state, &[price_id]).await?; verify_price_ids_exist(&state, &[price_id]).await?;
let price_feeds_with_update_data = get_price_feeds_with_update_data( let state = &*state.state;
&*state.state, let price_feeds_with_update_data = Aggregates::get_price_feeds_with_update_data(
state,
&[price_id], &[price_id],
RequestTime::FirstAfter(params.publish_time), RequestTime::FirstAfter(params.publish_time),
) )

View File

@ -1,11 +1,15 @@
use { use {
super::verify_price_ids_exist, super::verify_price_ids_exist,
crate::{ crate::{
aggregate::{ api::{
rest::RestError,
ApiState,
},
state::aggregate::{
Aggregates,
RequestTime, RequestTime,
UnixTimestamp, UnixTimestamp,
}, },
api::rest::RestError,
}, },
anyhow::Result, anyhow::Result,
axum::{ axum::{
@ -42,6 +46,8 @@ pub struct GetVaaCcipResponse {
data: String, // TODO: Use a typed wrapper for the hex output with leading 0x. data: String, // TODO: Use a typed wrapper for the hex output with leading 0x.
} }
/// **Deprecated: use /v2/updates/price/{publish_time} instead**
///
/// Get a VAA for a price feed using CCIP /// Get a VAA for a price feed using CCIP
/// ///
/// This endpoint accepts a single argument which is a hex-encoded byte string of the following form: /// This endpoint accepts a single argument which is a hex-encoded byte string of the following form:
@ -56,25 +62,30 @@ pub struct GetVaaCcipResponse {
GetVaaCcipQueryParams GetVaaCcipQueryParams
) )
)] )]
pub async fn get_vaa_ccip( #[deprecated]
State(state): State<crate::api::ApiState>, pub async fn get_vaa_ccip<S>(
State(state): State<ApiState<S>>,
QsQuery(params): QsQuery<GetVaaCcipQueryParams>, QsQuery(params): QsQuery<GetVaaCcipQueryParams>,
) -> Result<Json<GetVaaCcipResponse>, RestError> { ) -> Result<Json<GetVaaCcipResponse>, RestError>
where
S: Aggregates,
{
let price_id: PriceIdentifier = PriceIdentifier::new( let price_id: PriceIdentifier = PriceIdentifier::new(
params.data[0..32] params.data[0..32]
.try_into() .try_into()
.map_err(|_| RestError::InvalidCCIPInput)?, .map_err(|_| RestError::InvalidCCIPInput)?,
); );
verify_price_ids_exist(&state, &[price_id]).await?;
let publish_time = UnixTimestamp::from_be_bytes( let publish_time = UnixTimestamp::from_be_bytes(
params.data[32..40] params.data[32..40]
.try_into() .try_into()
.map_err(|_| RestError::InvalidCCIPInput)?, .map_err(|_| RestError::InvalidCCIPInput)?,
); );
verify_price_ids_exist(&state, &[price_id]).await?; let state = &*state.state;
let price_feeds_with_update_data = Aggregates::get_price_feeds_with_update_data(
let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data( state,
&*state.state,
&[price_id], &[price_id],
RequestTime::FirstAfter(publish_time), RequestTime::FirstAfter(publish_time),
) )

View File

@ -1,13 +1,17 @@
use { use {
super::verify_price_ids_exist, super::verify_price_ids_exist,
crate::{ crate::{
aggregate::RequestTime,
api::{ api::{
rest::RestError, rest::RestError,
types::{ types::{
PriceIdInput, PriceIdInput,
RpcPriceFeed, RpcPriceFeed,
}, },
ApiState,
},
state::aggregate::{
Aggregates,
RequestTime,
}, },
}, },
anyhow::Result, anyhow::Result,
@ -46,6 +50,8 @@ pub struct LatestPriceFeedsQueryParams {
binary: bool, binary: bool,
} }
/// **Deprecated: use /v2/updates/price/latest instead**
///
/// Get the latest price updates by price feed id. /// Get the latest price updates by price feed id.
/// ///
/// Given a collection of price feed ids, retrieve the latest Pyth price for each price feed. /// Given a collection of price feed ids, retrieve the latest Pyth price for each price feed.
@ -59,28 +65,29 @@ pub struct LatestPriceFeedsQueryParams {
LatestPriceFeedsQueryParams LatestPriceFeedsQueryParams
) )
)] )]
pub async fn latest_price_feeds( #[deprecated]
State(state): State<crate::api::ApiState>, pub async fn latest_price_feeds<S>(
State(state): State<ApiState<S>>,
QsQuery(params): QsQuery<LatestPriceFeedsQueryParams>, QsQuery(params): QsQuery<LatestPriceFeedsQueryParams>,
) -> Result<Json<Vec<RpcPriceFeed>>, RestError> { ) -> Result<Json<Vec<RpcPriceFeed>>, RestError>
where
S: Aggregates,
{
let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect(); let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect();
verify_price_ids_exist(&state, &price_ids).await?; verify_price_ids_exist(&state, &price_ids).await?;
let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data( let state = &*state.state;
&*state.state, let price_feeds_with_update_data =
&price_ids, Aggregates::get_price_feeds_with_update_data(state, &price_ids, RequestTime::Latest)
RequestTime::Latest, .await
) .map_err(|e| {
.await tracing::warn!(
.map_err(|e| { "Error getting price feeds {:?} with update data: {:?}",
tracing::warn!( price_ids,
"Error getting price feeds {:?} with update data: {:?}", e
price_ids, );
e RestError::UpdateDataNotFound
); })?;
RestError::UpdateDataNotFound
})?;
Ok(Json( Ok(Json(
price_feeds_with_update_data price_feeds_with_update_data

View File

@ -1,11 +1,15 @@
use { use {
super::verify_price_ids_exist, super::verify_price_ids_exist,
crate::{ crate::{
aggregate::RequestTime,
api::{ api::{
doc_examples, doc_examples,
rest::RestError, rest::RestError,
types::PriceIdInput, types::PriceIdInput,
ApiState,
},
state::aggregate::{
Aggregates,
RequestTime,
}, },
}, },
anyhow::Result, anyhow::Result,
@ -39,6 +43,8 @@ pub struct LatestVaasQueryParams {
} }
/// **Deprecated: use /v2/updates/price/latest instead**
///
/// Get VAAs for a set of price feed ids. /// Get VAAs for a set of price feed ids.
/// ///
/// Given a collection of price feed ids, retrieve the latest VAA for each. The returned VAA(s) can /// Given a collection of price feed ids, retrieve the latest VAA for each. The returned VAA(s) can
@ -54,28 +60,29 @@ pub struct LatestVaasQueryParams {
(status = 200, description = "VAAs retrieved successfully", body = Vec<String>, example=json!([doc_examples::vaa_example()])) (status = 200, description = "VAAs retrieved successfully", body = Vec<String>, example=json!([doc_examples::vaa_example()]))
), ),
)] )]
pub async fn latest_vaas( #[deprecated]
State(state): State<crate::api::ApiState>, pub async fn latest_vaas<S>(
State(state): State<ApiState<S>>,
QsQuery(params): QsQuery<LatestVaasQueryParams>, QsQuery(params): QsQuery<LatestVaasQueryParams>,
) -> Result<Json<Vec<String>>, RestError> { ) -> Result<Json<Vec<String>>, RestError>
where
S: Aggregates,
{
let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect(); let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect();
verify_price_ids_exist(&state, &price_ids).await?; verify_price_ids_exist(&state, &price_ids).await?;
let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data( let state = &*state.state;
&*state.state, let price_feeds_with_update_data =
&price_ids, Aggregates::get_price_feeds_with_update_data(state, &price_ids, RequestTime::Latest)
RequestTime::Latest, .await
) .map_err(|e| {
.await tracing::warn!(
.map_err(|e| { "Error getting price feeds {:?} with update data: {:?}",
tracing::warn!( price_ids,
"Error getting price feeds {:?} with update data: {:?}", e
price_ids, );
e RestError::UpdateDataNotFound
); })?;
RestError::UpdateDataNotFound
})?;
Ok(Json( Ok(Json(
price_feeds_with_update_data price_feeds_with_update_data

View File

@ -1,7 +1,11 @@
use { use {
crate::api::{ crate::{
rest::RestError, api::{
types::RpcPriceIdentifier, rest::RestError,
types::RpcPriceIdentifier,
ApiState,
},
state::aggregate::Aggregates,
}, },
anyhow::Result, anyhow::Result,
axum::{ axum::{
@ -10,6 +14,8 @@ use {
}, },
}; };
/// **Deprecated: use /v2/price_feeds instead**
///
/// Get the set of price feed IDs. /// Get the set of price feed IDs.
/// ///
/// This endpoint fetches all of the price feed IDs for which price updates can be retrieved. /// This endpoint fetches all of the price feed IDs for which price updates can be retrieved.
@ -21,10 +27,15 @@ use {
(status = 200, description = "Price feed ids retrieved successfully", body = Vec<RpcPriceIdentifier>) (status = 200, description = "Price feed ids retrieved successfully", body = Vec<RpcPriceIdentifier>)
), ),
)] )]
pub async fn price_feed_ids( #[deprecated]
State(state): State<crate::api::ApiState>, pub async fn price_feed_ids<S>(
) -> Result<Json<Vec<RpcPriceIdentifier>>, RestError> { State(state): State<ApiState<S>>,
let price_feed_ids = crate::aggregate::get_price_feed_ids(&*state.state) ) -> Result<Json<Vec<RpcPriceIdentifier>>, RestError>
where
S: Aggregates,
{
let state = &*state.state;
let price_feed_ids = Aggregates::get_price_feed_ids(state)
.await .await
.into_iter() .into_iter()
.map(RpcPriceIdentifier::from) .map(RpcPriceIdentifier::from)

View File

@ -1,14 +1,24 @@
use axum::{ use {
extract::State, crate::{
http::StatusCode, api::ApiState,
response::{ state::aggregate::Aggregates,
IntoResponse, },
Response, axum::{
extract::State,
http::StatusCode,
response::{
IntoResponse,
Response,
},
}, },
}; };
pub async fn ready(State(state): State<crate::api::ApiState>) -> Response { pub async fn ready<S>(State(state): State<ApiState<S>>) -> Response
match crate::aggregate::is_ready(&state.state).await { where
S: Aggregates,
{
let state = &*state.state;
match Aggregates::is_ready(state).await {
true => (StatusCode::OK, "OK").into_response(), true => (StatusCode::OK, "OK").into_response(),
false => (StatusCode::SERVICE_UNAVAILABLE, "Service Unavailable").into_response(), false => (StatusCode::SERVICE_UNAVAILABLE, "Service Unavailable").into_response(),
} }

View File

@ -1,6 +1,5 @@
use { use {
crate::{ crate::{
aggregate::RequestTime,
api::{ api::{
rest::{ rest::{
verify_price_ids_exist, verify_price_ids_exist,
@ -13,6 +12,11 @@ use {
PriceIdInput, PriceIdInput,
PriceUpdate, PriceUpdate,
}, },
ApiState,
},
state::aggregate::{
Aggregates,
RequestTime,
}, },
}, },
anyhow::Result, anyhow::Result,
@ -73,28 +77,28 @@ fn default_true() -> bool {
LatestPriceUpdatesQueryParams LatestPriceUpdatesQueryParams
) )
)] )]
pub async fn latest_price_updates( pub async fn latest_price_updates<S>(
State(state): State<crate::api::ApiState>, State(state): State<ApiState<S>>,
QsQuery(params): QsQuery<LatestPriceUpdatesQueryParams>, QsQuery(params): QsQuery<LatestPriceUpdatesQueryParams>,
) -> Result<Json<PriceUpdate>, RestError> { ) -> Result<Json<PriceUpdate>, RestError>
where
S: Aggregates,
{
let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect(); let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect();
verify_price_ids_exist(&state, &price_ids).await?; verify_price_ids_exist(&state, &price_ids).await?;
let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data( let state = &*state.state;
&*state.state, let price_feeds_with_update_data =
&price_ids, Aggregates::get_price_feeds_with_update_data(state, &price_ids, RequestTime::Latest)
RequestTime::Latest, .await
) .map_err(|e| {
.await tracing::warn!(
.map_err(|e| { "Error getting price feeds {:?} with update data: {:?}",
tracing::warn!( price_ids,
"Error getting price feeds {:?} with update data: {:?}", e
price_ids, );
e RestError::UpdateDataNotFound
); })?;
RestError::UpdateDataNotFound
})?;
let price_update_data = price_feeds_with_update_data.update_data; let price_update_data = price_feeds_with_update_data.update_data;
let encoded_data: Vec<String> = price_update_data let encoded_data: Vec<String> = price_update_data

View File

@ -1,9 +1,5 @@
use { use {
crate::{ crate::{
aggregate::{
AggregationEvent,
RequestTime,
},
api::{ api::{
rest::{ rest::{
verify_price_ids_exist, verify_price_ids_exist,
@ -19,6 +15,11 @@ use {
}, },
ApiState, ApiState,
}, },
state::aggregate::{
Aggregates,
AggregationEvent,
RequestTime,
},
}, },
anyhow::Result, anyhow::Result,
axum::{ axum::{
@ -88,16 +89,22 @@ fn default_true() -> bool {
params(StreamPriceUpdatesQueryParams) params(StreamPriceUpdatesQueryParams)
)] )]
/// SSE route handler for streaming price updates. /// SSE route handler for streaming price updates.
pub async fn price_stream_sse_handler( pub async fn price_stream_sse_handler<S>(
State(state): State<ApiState>, State(state): State<ApiState<S>>,
QsQuery(params): QsQuery<StreamPriceUpdatesQueryParams>, QsQuery(params): QsQuery<StreamPriceUpdatesQueryParams>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, RestError> { ) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, RestError>
where
S: Aggregates,
S: Sync,
S: Send,
S: 'static,
{
let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(Into::into).collect(); let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(Into::into).collect();
verify_price_ids_exist(&state, &price_ids).await?; verify_price_ids_exist(&state, &price_ids).await?;
// Clone the update_tx receiver to listen for new price updates // Clone the update_tx receiver to listen for new price updates
let update_rx: broadcast::Receiver<AggregationEvent> = state.update_tx.subscribe(); let update_rx: broadcast::Receiver<AggregationEvent> = Aggregates::subscribe(&*state.state);
// Convert the broadcast receiver into a Stream // Convert the broadcast receiver into a Stream
let stream = BroadcastStream::new(update_rx); let stream = BroadcastStream::new(update_rx);
@ -134,15 +141,18 @@ pub async fn price_stream_sse_handler(
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default())) Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
} }
async fn handle_aggregation_event( async fn handle_aggregation_event<S>(
event: AggregationEvent, event: AggregationEvent,
state: ApiState, state: ApiState<S>,
mut price_ids: Vec<PriceIdentifier>, mut price_ids: Vec<PriceIdentifier>,
encoding: EncodingType, encoding: EncodingType,
parsed: bool, parsed: bool,
benchmarks_only: bool, benchmarks_only: bool,
allow_unordered: bool, allow_unordered: bool,
) -> Result<Option<PriceUpdate>> { ) -> Result<Option<PriceUpdate>>
where
S: Aggregates,
{
// Handle out-of-order events // Handle out-of-order events
if let AggregationEvent::OutOfOrder { .. } = event { if let AggregationEvent::OutOfOrder { .. } = event {
if !allow_unordered { if !allow_unordered {
@ -151,11 +161,11 @@ async fn handle_aggregation_event(
} }
// We check for available price feed ids to ensure that the price feed ids provided exists since price feeds can be removed. // We check for available price feed ids to ensure that the price feed ids provided exists since price feeds can be removed.
let available_price_feed_ids = crate::aggregate::get_price_feed_ids(&*state.state).await; let available_price_feed_ids = Aggregates::get_price_feed_ids(&*state.state).await;
price_ids.retain(|price_feed_id| available_price_feed_ids.contains(price_feed_id)); price_ids.retain(|price_feed_id| available_price_feed_ids.contains(price_feed_id));
let mut price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data( let mut price_feeds_with_update_data = Aggregates::get_price_feeds_with_update_data(
&*state.state, &*state.state,
&price_ids, &price_ids,
RequestTime::AtSlot(event.slot()), RequestTime::AtSlot(event.slot()),
@ -185,7 +195,7 @@ async fn handle_aggregation_event(
.iter() .iter()
.any(|price_feed| price_feed.id == RpcPriceIdentifier::from(*price_id)) .any(|price_feed| price_feed.id == RpcPriceIdentifier::from(*price_id))
}); });
price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data( price_feeds_with_update_data = Aggregates::get_price_feeds_with_update_data(
&*state.state, &*state.state,
&price_ids, &price_ids,
RequestTime::AtSlot(event.slot()), RequestTime::AtSlot(event.slot()),

View File

@ -1,9 +1,5 @@
use { use {
crate::{ crate::{
aggregate::{
RequestTime,
UnixTimestamp,
},
api::{ api::{
doc_examples, doc_examples,
rest::{ rest::{
@ -17,6 +13,12 @@ use {
PriceIdInput, PriceIdInput,
PriceUpdate, PriceUpdate,
}, },
ApiState,
},
state::aggregate::{
Aggregates,
RequestTime,
UnixTimestamp,
}, },
}, },
anyhow::Result, anyhow::Result,
@ -87,18 +89,22 @@ fn default_true() -> bool {
TimestampPriceUpdatesQueryParams TimestampPriceUpdatesQueryParams
) )
)] )]
pub async fn timestamp_price_updates( pub async fn timestamp_price_updates<S>(
State(state): State<crate::api::ApiState>, State(state): State<ApiState<S>>,
Path(path_params): Path<TimestampPriceUpdatesPathParams>, Path(path_params): Path<TimestampPriceUpdatesPathParams>,
QsQuery(query_params): QsQuery<TimestampPriceUpdatesQueryParams>, QsQuery(query_params): QsQuery<TimestampPriceUpdatesQueryParams>,
) -> Result<Json<PriceUpdate>, RestError> { ) -> Result<Json<PriceUpdate>, RestError>
where
S: Aggregates,
{
let price_ids: Vec<PriceIdentifier> = let price_ids: Vec<PriceIdentifier> =
query_params.ids.into_iter().map(|id| id.into()).collect(); query_params.ids.into_iter().map(|id| id.into()).collect();
verify_price_ids_exist(&state, &price_ids).await?; verify_price_ids_exist(&state, &price_ids).await?;
let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data( let state = &*state.state;
&*state.state, let price_feeds_with_update_data = Aggregates::get_price_feeds_with_update_data(
state,
&price_ids, &price_ids,
RequestTime::FirstAfter(path_params.publish_time), RequestTime::FirstAfter(path_params.publish_time),
) )

View File

@ -1,6 +1,6 @@
use { use {
super::doc_examples, super::doc_examples,
crate::aggregate::{ crate::state::aggregate::{
PriceFeedUpdate, PriceFeedUpdate,
PriceFeedsWithUpdateData, PriceFeedsWithUpdateData,
Slot, Slot,

View File

@ -1,14 +1,18 @@
use { use {
super::types::{ super::{
PriceIdInput, types::{
RpcPriceFeed, PriceIdInput,
RpcPriceFeed,
},
ApiState,
}, },
crate::{ crate::state::{
aggregate::{ aggregate::{
Aggregates,
AggregationEvent, AggregationEvent,
RequestTime, RequestTime,
}, },
state::State, State,
}, },
anyhow::{ anyhow::{
anyhow, anyhow,
@ -212,11 +216,10 @@ pub async fn ws_route_handler(
} }
#[tracing::instrument(skip(stream, state, subscriber_ip))] #[tracing::instrument(skip(stream, state, subscriber_ip))]
async fn websocket_handler( async fn websocket_handler<S>(stream: WebSocket, state: ApiState<S>, subscriber_ip: Option<IpAddr>)
stream: WebSocket, where
state: super::ApiState, S: Aggregates,
subscriber_ip: Option<IpAddr>, {
) {
let ws_state = state.ws.clone(); let ws_state = state.ws.clone();
// Retain the recent rate limit data for the IP addresses to // Retain the recent rate limit data for the IP addresses to
@ -235,7 +238,7 @@ async fn websocket_handler(
}) })
.inc(); .inc();
let notify_receiver = state.update_tx.subscribe(); let notify_receiver = Aggregates::subscribe(&*state.state);
let (sender, receiver) = stream.split(); let (sender, receiver) = stream.split();
let mut subscriber = Subscriber::new( let mut subscriber = Subscriber::new(
id, id,
@ -254,11 +257,11 @@ pub type SubscriberId = usize;
/// Subscriber is an actor that handles a single websocket connection. /// Subscriber is an actor that handles a single websocket connection.
/// It listens to the store for updates and sends them to the client. /// It listens to the store for updates and sends them to the client.
pub struct Subscriber { pub struct Subscriber<S> {
id: SubscriberId, id: SubscriberId,
ip_addr: Option<IpAddr>, ip_addr: Option<IpAddr>,
closed: bool, closed: bool,
store: Arc<State>, state: Arc<S>,
ws_state: Arc<WsState>, ws_state: Arc<WsState>,
notify_receiver: Receiver<AggregationEvent>, notify_receiver: Receiver<AggregationEvent>,
receiver: SplitStream<WebSocket>, receiver: SplitStream<WebSocket>,
@ -269,11 +272,14 @@ pub struct Subscriber {
responded_to_ping: bool, responded_to_ping: bool,
} }
impl Subscriber { impl<S> Subscriber<S>
where
S: Aggregates,
{
pub fn new( pub fn new(
id: SubscriberId, id: SubscriberId,
ip_addr: Option<IpAddr>, ip_addr: Option<IpAddr>,
store: Arc<State>, state: Arc<S>,
ws_state: Arc<WsState>, ws_state: Arc<WsState>,
notify_receiver: Receiver<AggregationEvent>, notify_receiver: Receiver<AggregationEvent>,
receiver: SplitStream<WebSocket>, receiver: SplitStream<WebSocket>,
@ -283,7 +289,7 @@ impl Subscriber {
id, id,
ip_addr, ip_addr,
closed: false, closed: false,
store, state,
ws_state, ws_state,
notify_receiver, notify_receiver,
receiver, receiver,
@ -350,8 +356,9 @@ impl Subscriber {
.cloned() .cloned()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let updates = match crate::aggregate::get_price_feeds_with_update_data( let state = &*self.state;
&*self.store, let updates = match Aggregates::get_price_feeds_with_update_data(
state,
&price_feed_ids, &price_feed_ids,
RequestTime::AtSlot(event.slot()), RequestTime::AtSlot(event.slot()),
) )
@ -364,8 +371,7 @@ impl Subscriber {
// subscription. In this case we just remove the non-existing // subscription. In this case we just remove the non-existing
// price feed from the list and will keep sending updates for // price feed from the list and will keep sending updates for
// the rest. // the rest.
let available_price_feed_ids = let available_price_feed_ids = Aggregates::get_price_feed_ids(state).await;
crate::aggregate::get_price_feed_ids(&*self.store).await;
self.price_feeds_with_config self.price_feeds_with_config
.retain(|price_feed_id, _| available_price_feed_ids.contains(price_feed_id)); .retain(|price_feed_id, _| available_price_feed_ids.contains(price_feed_id));
@ -376,8 +382,8 @@ impl Subscriber {
.cloned() .cloned()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
crate::aggregate::get_price_feeds_with_update_data( Aggregates::get_price_feeds_with_update_data(
&*self.store, state,
&price_feed_ids, &price_feed_ids,
RequestTime::AtSlot(event.slot()), RequestTime::AtSlot(event.slot()),
) )
@ -545,7 +551,7 @@ impl Subscriber {
allow_out_of_order, allow_out_of_order,
}) => { }) => {
let price_ids: Vec<PriceIdentifier> = ids.into_iter().map(|id| id.into()).collect(); let price_ids: Vec<PriceIdentifier> = ids.into_iter().map(|id| id.into()).collect();
let available_price_ids = crate::aggregate::get_price_feed_ids(&*self.store).await; let available_price_ids = Aggregates::get_price_feed_ids(&*self.state).await;
let not_found_price_ids: Vec<&PriceIdentifier> = price_ids let not_found_price_ids: Vec<&PriceIdentifier> = price_ids
.iter() .iter()

View File

@ -19,9 +19,9 @@ pub struct Options {
#[arg(env = "PYTHNET_HTTP_ADDR")] #[arg(env = "PYTHNET_HTTP_ADDR")]
pub http_addr: String, pub http_addr: String,
/// Pyth mapping account address. /// Pyth mapping account address on Pythnet.
#[arg(long = "mapping-address")] #[arg(long = "pythnet-mapping-addr")]
#[arg(default_value = DEFAULT_PYTHNET_MAPPING_ADDR)] #[arg(default_value = DEFAULT_PYTHNET_MAPPING_ADDR)]
#[arg(env = "MAPPING_ADDRESS")] #[arg(env = "PYTHNET_MAPPING_ADDR")]
pub mapping_addr: Pubkey, pub mapping_addr: Pubkey,
} }

View File

@ -17,7 +17,6 @@ use {
}, },
}; };
mod aggregate;
mod api; mod api;
mod config; mod config;
mod metrics_server; mod metrics_server;
@ -54,7 +53,7 @@ async fn init() -> Result<()> {
let (update_tx, _) = tokio::sync::broadcast::channel(1000); let (update_tx, _) = tokio::sync::broadcast::channel(1000);
// Initialize a cache store with a 1000 element circular buffer. // Initialize a cache store with a 1000 element circular buffer.
let store = State::new(update_tx.clone(), 1000, opts.benchmarks.endpoint.clone()); let state = State::new(update_tx.clone(), 1000, opts.benchmarks.endpoint.clone());
// Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown. // Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown.
spawn(async move { spawn(async move {
@ -67,10 +66,10 @@ async fn init() -> Result<()> {
// Spawn all worker tasks, and wait for all to complete (which will happen if a shutdown // Spawn all worker tasks, and wait for all to complete (which will happen if a shutdown
// signal has been observed). // signal has been observed).
let tasks = join_all(vec![ let tasks = join_all(vec![
spawn(network::wormhole::spawn(opts.clone(), store.clone())), spawn(network::wormhole::spawn(opts.clone(), state.clone())),
spawn(network::pythnet::spawn(opts.clone(), store.clone())), spawn(network::pythnet::spawn(opts.clone(), state.clone())),
spawn(metrics_server::run(opts.clone(), store.clone())), spawn(metrics_server::run(opts.clone(), state.clone())),
spawn(api::spawn(opts.clone(), store.clone(), update_tx)), spawn(api::spawn(opts.clone(), state.clone())),
]) ])
.await; .await;

View File

@ -4,10 +4,6 @@
use { use {
crate::{ crate::{
aggregate::{
AccumulatorMessages,
Update,
},
api::types::PriceFeedMetadata, api::types::PriceFeedMetadata,
config::RunOptions, config::RunOptions,
network::wormhole::{ network::wormhole::{
@ -20,7 +16,14 @@ use {
PriceFeedMeta, PriceFeedMeta,
DEFAULT_PRICE_FEEDS_CACHE_UPDATE_INTERVAL, DEFAULT_PRICE_FEEDS_CACHE_UPDATE_INTERVAL,
}, },
state::State, state::{
aggregate::{
AccumulatorMessages,
Aggregates,
Update,
},
State,
},
}, },
anyhow::{ anyhow::{
anyhow, anyhow,
@ -136,7 +139,7 @@ async fn fetch_bridge_data(
} }
} }
pub async fn run(store: Arc<State>, pythnet_ws_endpoint: String) -> Result<()> { pub async fn run(store: Arc<State>, pythnet_ws_endpoint: String) -> Result<!> {
let client = PubsubClient::new(pythnet_ws_endpoint.as_ref()).await?; let client = PubsubClient::new(pythnet_ws_endpoint.as_ref()).await?;
let config = RpcProgramAccountsConfig { let config = RpcProgramAccountsConfig {
@ -157,59 +160,54 @@ pub async fn run(store: Arc<State>, pythnet_ws_endpoint: String) -> Result<()> {
.program_subscribe(&system_program::id(), Some(config)) .program_subscribe(&system_program::id(), Some(config))
.await?; .await?;
loop { while let Some(update) = notif.next().await {
match notif.next().await { let account: Account = match update.value.account.decode() {
Some(update) => { Some(account) => account,
let account: Account = match update.value.account.decode() {
Some(account) => account,
None => {
tracing::error!(?update, "Failed to decode account from update.");
continue;
}
};
let accumulator_messages = AccumulatorMessages::try_from_slice(&account.data);
match accumulator_messages {
Ok(accumulator_messages) => {
let (candidate, _) = Pubkey::find_program_address(
&[
b"AccumulatorState",
&accumulator_messages.ring_index().to_be_bytes(),
],
&system_program::id(),
);
if candidate.to_string() == update.value.pubkey {
let store = store.clone();
tokio::spawn(async move {
if let Err(err) = crate::aggregate::store_update(
&store,
Update::AccumulatorMessages(accumulator_messages),
)
.await
{
tracing::error!(error = ?err, "Failed to store accumulator messages.");
}
});
} else {
tracing::error!(
?candidate,
?update.value.pubkey,
"Failed to verify message public keys.",
);
}
}
Err(err) => {
tracing::error!(error = ?err, "Failed to parse AccumulatorMessages.");
}
};
}
None => { None => {
return Err(anyhow!("Pythnet network listener terminated")); tracing::error!(?update, "Failed to decode account from update.");
continue;
} }
} };
let accumulator_messages = AccumulatorMessages::try_from_slice(&account.data);
match accumulator_messages {
Ok(accumulator_messages) => {
let (candidate, _) = Pubkey::find_program_address(
&[
b"AccumulatorState",
&accumulator_messages.ring_index().to_be_bytes(),
],
&system_program::id(),
);
if candidate.to_string() == update.value.pubkey {
let store = store.clone();
tokio::spawn(async move {
if let Err(err) = Aggregates::store_update(
&*store,
Update::AccumulatorMessages(accumulator_messages),
)
.await
{
tracing::error!(error = ?err, "Failed to store accumulator messages.");
}
});
} else {
tracing::error!(
?candidate,
?update.value.pubkey,
"Failed to verify message public keys.",
);
}
}
Err(err) => {
tracing::error!(error = ?err, "Failed to parse AccumulatorMessages.");
}
};
} }
Err(anyhow!("Pythnet network listener connection terminated"))
} }
/// Fetch existing GuardianSet accounts from Wormhole. /// Fetch existing GuardianSet accounts from Wormhole.

View File

@ -7,7 +7,13 @@
use { use {
crate::{ crate::{
config::RunOptions, config::RunOptions,
state::State, state::{
aggregate::{
Aggregates,
Update,
},
State,
},
}, },
anyhow::{ anyhow::{
anyhow, anyhow,
@ -43,7 +49,11 @@ use {
Digest, Digest,
Keccak256, Keccak256,
}, },
std::sync::Arc, std::{
sync::Arc,
time::Duration,
},
tokio::time::Instant,
tonic::Request, tonic::Request,
wormhole_sdk::{ wormhole_sdk::{
vaa::{ vaa::{
@ -100,10 +110,10 @@ pub struct BridgeConfig {
/// GuardianSetData extracted from wormhole bridge account, due to no API. /// GuardianSetData extracted from wormhole bridge account, due to no API.
#[derive(borsh::BorshDeserialize)] #[derive(borsh::BorshDeserialize)]
pub struct GuardianSetData { pub struct GuardianSetData {
pub index: u32, pub _index: u32,
pub keys: Vec<[u8; 20]>, pub keys: Vec<[u8; 20]>,
pub creation_time: u32, pub _creation_time: u32,
pub expiration_time: u32, pub _expiration_time: u32,
} }
/// Update the guardian set with the given ID in the state. /// Update the guardian set with the given ID in the state.
@ -152,10 +162,16 @@ mod proto {
pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> { pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> {
let mut exit = crate::EXIT.subscribe(); let mut exit = crate::EXIT.subscribe();
loop { loop {
let current_time = Instant::now();
tokio::select! { tokio::select! {
_ = exit.changed() => break, _ = exit.changed() => break,
Err(err) = run(opts.clone(), state.clone()) => { Err(err) = run(opts.clone(), state.clone()) => {
tracing::error!(error = ?err, "Wormhole gRPC service failed."); tracing::error!(error = ?err, "Wormhole gRPC service failed.");
if current_time.elapsed() < Duration::from_secs(30) {
tracing::error!("Wormhole listener restarting too quickly. Sleep 1s.");
tokio::time::sleep(Duration::from_secs(1)).await;
}
} }
} }
} }
@ -164,7 +180,7 @@ pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> {
} }
#[tracing::instrument(skip(opts, state))] #[tracing::instrument(skip(opts, state))]
async fn run(opts: RunOptions, state: Arc<State>) -> Result<()> { async fn run(opts: RunOptions, state: Arc<State>) -> Result<!> {
let mut client = SpyRpcServiceClient::connect(opts.wormhole.spy_rpc_addr).await?; let mut client = SpyRpcServiceClient::connect(opts.wormhole.spy_rpc_addr).await?;
let mut stream = client let mut stream = client
.subscribe_signed_vaa(Request::new(SubscribeSignedVaaRequest { .subscribe_signed_vaa(Request::new(SubscribeSignedVaaRequest {
@ -184,7 +200,7 @@ async fn run(opts: RunOptions, state: Arc<State>) -> Result<()> {
} }
} }
Ok(()) Err(anyhow!("Wormhole gRPC stream terminated."))
} }
/// Process a message received via a Wormhole gRPC connection. /// Process a message received via a Wormhole gRPC connection.
@ -352,9 +368,7 @@ pub async fn store_vaa(state: Arc<State>, sequence: u64, vaa_bytes: Vec<u8>) {
} }
// Hand the VAA to the aggregate store. // Hand the VAA to the aggregate store.
if let Err(e) = if let Err(e) = Aggregates::store_update(&*state, Update::Vaa(vaa_bytes)).await {
crate::aggregate::store_update(&state, crate::aggregate::Update::Vaa(vaa_bytes)).await
{
tracing::error!(error = ?e, "Failed to store VAA in aggregate store."); tracing::error!(error = ?e, "Failed to store VAA in aggregate store.");
} }
} }

View File

@ -31,6 +31,7 @@ impl<'a> From<&'a State> for &'a PriceFeedMetaState {
} }
} }
#[async_trait::async_trait]
pub trait PriceFeedMeta { pub trait PriceFeedMeta {
async fn retrieve_price_feeds_metadata(&self) -> Result<Vec<PriceFeedMetadata>>; async fn retrieve_price_feeds_metadata(&self) -> Result<Vec<PriceFeedMetadata>>;
async fn store_price_feeds_metadata( async fn store_price_feeds_metadata(
@ -44,6 +45,7 @@ pub trait PriceFeedMeta {
) -> Result<Vec<PriceFeedMetadata>>; ) -> Result<Vec<PriceFeedMetadata>>;
} }
#[async_trait::async_trait]
impl<T> PriceFeedMeta for T impl<T> PriceFeedMeta for T
where where
for<'a> &'a T: Into<&'a PriceFeedMetaState>, for<'a> &'a T: Into<&'a PriceFeedMetaState>,

View File

@ -2,14 +2,14 @@
use { use {
self::{ self::{
benchmarks::BenchmarksState,
cache::CacheState,
},
crate::{
aggregate::{ aggregate::{
AggregateState, AggregateState,
AggregationEvent, AggregationEvent,
}, },
benchmarks::BenchmarksState,
cache::CacheState,
},
crate::{
network::wormhole::GuardianSet, network::wormhole::GuardianSet,
price_feeds_metadata::PriceFeedMetaState, price_feeds_metadata::PriceFeedMetaState,
}, },
@ -28,6 +28,7 @@ use {
}, },
}; };
pub mod aggregate;
pub mod benchmarks; pub mod benchmarks;
pub mod cache; pub mod cache;
@ -41,6 +42,9 @@ pub struct State {
/// State for the `PriceFeedMeta` service for looking up metadata related to Pyth price feeds. /// State for the `PriceFeedMeta` service for looking up metadata related to Pyth price feeds.
pub price_feed_meta: PriceFeedMetaState, pub price_feed_meta: PriceFeedMetaState,
/// State for accessing/storing Pyth price aggregates.
pub aggregates: AggregateState,
/// Sequence numbers of lately observed Vaas. Store uses this set /// Sequence numbers of lately observed Vaas. Store uses this set
/// to ignore the previously observed Vaas as a performance boost. /// to ignore the previously observed Vaas as a performance boost.
pub observed_vaa_seqs: RwLock<BTreeSet<u64>>, pub observed_vaa_seqs: RwLock<BTreeSet<u64>>,
@ -48,12 +52,6 @@ pub struct State {
/// Wormhole guardian sets. It is used to verify Vaas before using them. /// Wormhole guardian sets. It is used to verify Vaas before using them.
pub guardian_set: RwLock<BTreeMap<u32, GuardianSet>>, pub guardian_set: RwLock<BTreeMap<u32, GuardianSet>>,
/// The sender to the channel between Store and Api to notify completed updates.
pub api_update_tx: Sender<AggregationEvent>,
/// The aggregate module state.
pub aggregate_state: RwLock<AggregateState>,
/// Metrics registry /// Metrics registry
pub metrics_registry: RwLock<Registry>, pub metrics_registry: RwLock<Registry>,
} }
@ -69,10 +67,9 @@ impl State {
cache: CacheState::new(cache_size), cache: CacheState::new(cache_size),
benchmarks: BenchmarksState::new(benchmarks_endpoint), benchmarks: BenchmarksState::new(benchmarks_endpoint),
price_feed_meta: PriceFeedMetaState::new(), price_feed_meta: PriceFeedMetaState::new(),
aggregates: AggregateState::new(update_tx, &mut metrics_registry),
observed_vaa_seqs: RwLock::new(Default::default()), observed_vaa_seqs: RwLock::new(Default::default()),
guardian_set: RwLock::new(Default::default()), guardian_set: RwLock::new(Default::default()),
api_update_tx: update_tx,
aggregate_state: RwLock::new(AggregateState::new(&mut metrics_registry)),
metrics_registry: RwLock::new(metrics_registry), metrics_registry: RwLock::new(metrics_registry),
}) })
} }

View File

@ -20,6 +20,7 @@ use {
}, },
crate::{ crate::{
network::wormhole::VaaBytes, network::wormhole::VaaBytes,
price_feeds_metadata::PriceFeedMeta,
state::{ state::{
benchmarks::Benchmarks, benchmarks::Benchmarks,
cache::{ cache::{
@ -59,6 +60,13 @@ use {
collections::HashSet, collections::HashSet,
time::Duration, time::Duration,
}, },
tokio::sync::{
broadcast::{
Receiver,
Sender,
},
RwLock,
},
wormhole_sdk::Vaa, wormhole_sdk::Vaa,
}; };
@ -102,8 +110,7 @@ impl AggregationEvent {
} }
} }
#[derive(Clone, Debug)] pub struct AggregateStateData {
pub struct AggregateState {
/// The latest completed slot. This is used to check whether a completed state is new or out of /// The latest completed slot. This is used to check whether a completed state is new or out of
/// order. /// order.
pub latest_completed_slot: Option<Slot>, pub latest_completed_slot: Option<Slot>,
@ -119,7 +126,7 @@ pub struct AggregateState {
pub metrics: metrics::Metrics, pub metrics: metrics::Metrics,
} }
impl AggregateState { impl AggregateStateData {
pub fn new(metrics_registry: &mut Registry) -> Self { pub fn new(metrics_registry: &mut Registry) -> Self {
Self { Self {
latest_completed_slot: None, latest_completed_slot: None,
@ -130,6 +137,20 @@ impl AggregateState {
} }
} }
pub struct AggregateState {
pub data: RwLock<AggregateStateData>,
pub api_update_tx: Sender<AggregationEvent>,
}
impl AggregateState {
pub fn new(update_tx: Sender<AggregationEvent>, metrics_registry: &mut Registry) -> Self {
Self {
data: RwLock::new(AggregateStateData::new(metrics_registry)),
api_update_tx: update_tx,
}
}
}
/// Accumulator messages coming from Pythnet validators. /// Accumulator messages coming from Pythnet validators.
/// ///
/// The validators writes the accumulator messages using Borsh with /// The validators writes the accumulator messages using Borsh with
@ -177,124 +198,220 @@ const READINESS_STALENESS_THRESHOLD: Duration = Duration::from_secs(30);
/// 10 slots is almost 5 seconds. /// 10 slots is almost 5 seconds.
const READINESS_MAX_ALLOWED_SLOT_LAG: Slot = 10; const READINESS_MAX_ALLOWED_SLOT_LAG: Slot = 10;
/// Stores the update data in the store #[async_trait::async_trait]
#[tracing::instrument(skip(state, update))] pub trait Aggregates
pub async fn store_update(state: &State, update: Update) -> Result<()> { where
// The slot that the update is originating from. It should be available Self: Cache,
// in all the updates. Self: Benchmarks,
let slot = match update { Self: PriceFeedMeta,
Update::Vaa(update_vaa) => { {
let vaa = serde_wormhole::from_slice::<Vaa<&serde_wormhole::RawMessage>>( fn subscribe(&self) -> Receiver<AggregationEvent>;
update_vaa.as_ref(), async fn is_ready(&self) -> bool;
)?; async fn store_update(&self, update: Update) -> Result<()>;
match WormholeMessage::try_from_bytes(vaa.payload)?.payload { async fn get_price_feed_ids(&self) -> HashSet<PriceIdentifier>;
WormholePayload::Merkle(proof) => { async fn get_price_feeds_with_update_data(
tracing::info!(slot = proof.slot, "Storing VAA Merkle Proof."); &self,
price_ids: &[PriceIdentifier],
request_time: RequestTime,
) -> Result<PriceFeedsWithUpdateData>;
}
store_wormhole_merkle_verified_message( /// Allow downcasting State into CacheState for functions that depend on the `Cache` service.
state, impl<'a> From<&'a State> for &'a AggregateState {
proof.clone(), fn from(state: &'a State) -> &'a AggregateState {
update_vaa.to_owned(), &state.aggregates
) }
.await?; }
state #[async_trait::async_trait]
.aggregate_state impl<T> Aggregates for T
.write() where
.await for<'a> &'a T: Into<&'a AggregateState>,
.metrics T: Sync,
.observe(proof.slot, metrics::Event::Vaa); T: Send,
T: Cache,
proof.slot T: Benchmarks,
} T: PriceFeedMeta,
} {
} fn subscribe(&self) -> Receiver<AggregationEvent> {
Update::AccumulatorMessages(accumulator_messages) => { self.into().api_update_tx.subscribe()
let slot = accumulator_messages.slot;
tracing::info!(slot = slot, "Storing Accumulator Messages.");
state
.store_accumulator_messages(accumulator_messages)
.await?;
state
.aggregate_state
.write()
.await
.metrics
.observe(slot, metrics::Event::AccumulatorMessages);
slot
}
};
// Update the aggregate state with the latest observed slot
{
let mut aggregate_state = state.aggregate_state.write().await;
aggregate_state.latest_observed_slot = aggregate_state
.latest_observed_slot
.map(|latest| latest.max(slot))
.or(Some(slot));
} }
let accumulator_messages = state.fetch_accumulator_messages(slot).await?; /// Stores the update data in the store
let wormhole_merkle_state = state.fetch_wormhole_merkle_state(slot).await?; #[tracing::instrument(skip(self, update))]
async fn store_update(&self, update: Update) -> Result<()> {
// The slot that the update is originating from. It should be available
// in all the updates.
let slot = match update {
Update::Vaa(update_vaa) => {
let vaa = serde_wormhole::from_slice::<Vaa<&serde_wormhole::RawMessage>>(
update_vaa.as_ref(),
)?;
match WormholeMessage::try_from_bytes(vaa.payload)?.payload {
WormholePayload::Merkle(proof) => {
tracing::info!(slot = proof.slot, "Storing VAA Merkle Proof.");
let (accumulator_messages, wormhole_merkle_state) = store_wormhole_merkle_verified_message(
match (accumulator_messages, wormhole_merkle_state) { self,
(Some(accumulator_messages), Some(wormhole_merkle_state)) => { proof.clone(),
(accumulator_messages, wormhole_merkle_state) update_vaa.to_owned(),
)
.await?;
self.into()
.data
.write()
.await
.metrics
.observe(proof.slot, metrics::Event::Vaa);
proof.slot
}
}
}
Update::AccumulatorMessages(accumulator_messages) => {
let slot = accumulator_messages.slot;
tracing::info!(slot = slot, "Storing Accumulator Messages.");
self.store_accumulator_messages(accumulator_messages)
.await?;
self.into()
.data
.write()
.await
.metrics
.observe(slot, metrics::Event::AccumulatorMessages);
slot
} }
_ => return Ok(()),
}; };
tracing::info!(slot = wormhole_merkle_state.root.slot, "Completed Update."); // Update the aggregate state with the latest observed slot
{
// Once the accumulator reaches a complete state for a specific slot let mut aggregate_state = self.into().data.write().await;
// we can build the message states aggregate_state.latest_observed_slot = aggregate_state
let message_states = build_message_states(accumulator_messages, wormhole_merkle_state)?; .latest_observed_slot
.map(|latest| latest.max(slot))
let message_state_keys = message_states .or(Some(slot));
.iter()
.map(|message_state| message_state.key())
.collect::<HashSet<_>>();
tracing::info!(len = message_states.len(), "Storing Message States.");
state.store_message_states(message_states).await?;
// Update the aggregate state
let mut aggregate_state = state.aggregate_state.write().await;
// Send update event to subscribers. We are purposefully ignoring the result
// because there might be no subscribers.
let _ = match aggregate_state.latest_completed_slot {
None => {
aggregate_state.latest_completed_slot.replace(slot);
state.api_update_tx.send(AggregationEvent::New { slot })
} }
Some(latest) if slot > latest => {
state.prune_removed_keys(message_state_keys).await; let accumulator_messages = self.fetch_accumulator_messages(slot).await?;
aggregate_state.latest_completed_slot.replace(slot); let wormhole_merkle_state = self.fetch_wormhole_merkle_state(slot).await?;
state.api_update_tx.send(AggregationEvent::New { slot })
let (accumulator_messages, wormhole_merkle_state) =
match (accumulator_messages, wormhole_merkle_state) {
(Some(accumulator_messages), Some(wormhole_merkle_state)) => {
(accumulator_messages, wormhole_merkle_state)
}
_ => return Ok(()),
};
tracing::info!(slot = wormhole_merkle_state.root.slot, "Completed Update.");
// Once the accumulator reaches a complete state for a specific slot
// we can build the message states
let message_states = build_message_states(accumulator_messages, wormhole_merkle_state)?;
let message_state_keys = message_states
.iter()
.map(|message_state| message_state.key())
.collect::<HashSet<_>>();
tracing::info!(len = message_states.len(), "Storing Message States.");
self.store_message_states(message_states).await?;
// Update the aggregate state
let mut aggregate_state = self.into().data.write().await;
// Send update event to subscribers. We are purposefully ignoring the result
// because there might be no subscribers.
let _ = match aggregate_state.latest_completed_slot {
None => {
aggregate_state.latest_completed_slot.replace(slot);
self.into()
.api_update_tx
.send(AggregationEvent::New { slot })
}
Some(latest) if slot > latest => {
self.prune_removed_keys(message_state_keys).await;
aggregate_state.latest_completed_slot.replace(slot);
self.into()
.api_update_tx
.send(AggregationEvent::New { slot })
}
_ => self
.into()
.api_update_tx
.send(AggregationEvent::OutOfOrder { slot }),
};
aggregate_state.latest_completed_slot = aggregate_state
.latest_completed_slot
.map(|latest| latest.max(slot))
.or(Some(slot));
aggregate_state
.latest_completed_update_at
.replace(Instant::now());
aggregate_state
.metrics
.observe(slot, metrics::Event::CompletedUpdate);
Ok(())
}
async fn get_price_feeds_with_update_data(
&self,
price_ids: &[PriceIdentifier],
request_time: RequestTime,
) -> Result<PriceFeedsWithUpdateData> {
match get_verified_price_feeds(self, price_ids, request_time.clone()).await {
Ok(price_feeds_with_update_data) => Ok(price_feeds_with_update_data),
Err(e) => {
if let RequestTime::FirstAfter(publish_time) = request_time {
return Benchmarks::get_verified_price_feeds(self, price_ids, publish_time)
.await;
}
Err(e)
}
} }
_ => state }
.api_update_tx
.send(AggregationEvent::OutOfOrder { slot }),
};
aggregate_state.latest_completed_slot = aggregate_state async fn get_price_feed_ids(&self) -> HashSet<PriceIdentifier> {
.latest_completed_slot Cache::message_state_keys(self)
.map(|latest| latest.max(slot)) .await
.or(Some(slot)); .iter()
.map(|key| PriceIdentifier::new(key.feed_id))
.collect()
}
aggregate_state async fn is_ready(&self) -> bool {
.latest_completed_update_at let metadata = self.into().data.read().await;
.replace(Instant::now()); let price_feeds_metadata = PriceFeedMeta::retrieve_price_feeds_metadata(self)
.await
.unwrap();
aggregate_state let has_completed_recently = match metadata.latest_completed_update_at.as_ref() {
.metrics Some(latest_completed_update_time) => {
.observe(slot, metrics::Event::CompletedUpdate); latest_completed_update_time.elapsed() < READINESS_STALENESS_THRESHOLD
}
None => false,
};
Ok(()) let is_not_behind = match (
metadata.latest_completed_slot,
metadata.latest_observed_slot,
) {
(Some(latest_completed_slot), Some(latest_observed_slot)) => {
latest_observed_slot - latest_completed_slot <= READINESS_MAX_ALLOWED_SLOT_LAG
}
_ => false,
};
let is_metadata_loaded = !price_feeds_metadata.is_empty();
has_completed_recently && is_not_behind && is_metadata_loaded
}
} }
#[tracing::instrument(skip(accumulator_messages, wormhole_merkle_state))] #[tracing::instrument(skip(accumulator_messages, wormhole_merkle_state))]
@ -389,73 +506,12 @@ where
}) })
} }
pub async fn get_price_feeds_with_update_data<S>(
state: &S,
price_ids: &[PriceIdentifier],
request_time: RequestTime,
) -> Result<PriceFeedsWithUpdateData>
where
S: Cache,
S: Benchmarks,
{
match get_verified_price_feeds(state, price_ids, request_time.clone()).await {
Ok(price_feeds_with_update_data) => Ok(price_feeds_with_update_data),
Err(e) => {
if let RequestTime::FirstAfter(publish_time) = request_time {
return state
.get_verified_price_feeds(price_ids, publish_time)
.await;
}
Err(e)
}
}
}
pub async fn get_price_feed_ids<S>(state: &S) -> HashSet<PriceIdentifier>
where
S: Cache,
{
state
.message_state_keys()
.await
.iter()
.map(|key| PriceIdentifier::new(key.feed_id))
.collect()
}
pub async fn is_ready(state: &State) -> bool {
let metadata = state.aggregate_state.read().await;
let price_feeds_metadata = state.price_feed_meta.data.read().await;
let has_completed_recently = match metadata.latest_completed_update_at.as_ref() {
Some(latest_completed_update_time) => {
latest_completed_update_time.elapsed() < READINESS_STALENESS_THRESHOLD
}
None => false,
};
let is_not_behind = match (
metadata.latest_completed_slot,
metadata.latest_observed_slot,
) {
(Some(latest_completed_slot), Some(latest_observed_slot)) => {
latest_observed_slot - latest_completed_slot <= READINESS_MAX_ALLOWED_SLOT_LAG
}
_ => false,
};
let is_metadata_loaded = !price_feeds_metadata.is_empty();
has_completed_recently && is_not_behind && is_metadata_loaded
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use { use {
super::*, super::*,
crate::{ crate::{
api::types::PriceFeedMetadata, api::types::PriceFeedMetadata,
price_feeds_metadata::PriceFeedMeta,
state::test::setup_state, state::test::setup_state,
}, },
futures::future::join_all, futures::future::join_all,
@ -557,7 +613,7 @@ mod test {
} }
pub async fn store_multiple_concurrent_valid_updates(state: Arc<State>, updates: Vec<Update>) { pub async fn store_multiple_concurrent_valid_updates(state: Arc<State>, updates: Vec<Update>) {
let res = join_all(updates.into_iter().map(|u| store_update(&state, u))).await; let res = join_all(updates.into_iter().map(|u| (&state).store_update(u))).await;
// Check that all store_update calls succeeded // Check that all store_update calls succeeded
assert!(res.into_iter().all(|r| r.is_ok())); assert!(res.into_iter().all(|r| r.is_ok()));
} }
@ -583,19 +639,19 @@ mod test {
// Check the price ids are stored correctly // Check the price ids are stored correctly
assert_eq!( assert_eq!(
get_price_feed_ids(&*state).await, (&*state).get_price_feed_ids().await,
vec![PriceIdentifier::new([100; 32])].into_iter().collect() vec![PriceIdentifier::new([100; 32])].into_iter().collect()
); );
// Check get_price_feeds_with_update_data retrieves the correct // Check get_price_feeds_with_update_data retrieves the correct
// price feed with correct update data. // price feed with correct update data.
let price_feeds_with_update_data = get_price_feeds_with_update_data( let price_feeds_with_update_data = (&*state)
&*state, .get_price_feeds_with_update_data(
&[PriceIdentifier::new([100; 32])], &[PriceIdentifier::new([100; 32])],
RequestTime::Latest, RequestTime::Latest,
) )
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
price_feeds_with_update_data.price_feeds, price_feeds_with_update_data.price_feeds,
@ -708,7 +764,7 @@ mod test {
// Check the price ids are stored correctly // Check the price ids are stored correctly
assert_eq!( assert_eq!(
get_price_feed_ids(&*state).await, (&*state).get_price_feed_ids().await,
vec![ vec![
PriceIdentifier::new([100; 32]), PriceIdentifier::new([100; 32]),
PriceIdentifier::new([200; 32]) PriceIdentifier::new([200; 32])
@ -718,13 +774,13 @@ mod test {
); );
// Check that price feed 2 exists // Check that price feed 2 exists
assert!(get_price_feeds_with_update_data( assert!((&*state)
&*state, .get_price_feeds_with_update_data(
&[PriceIdentifier::new([200; 32])], &[PriceIdentifier::new([200; 32])],
RequestTime::Latest, RequestTime::Latest,
) )
.await .await
.is_ok()); .is_ok());
// Now send an update with only price feed 1 (without price feed 2) // Now send an update with only price feed 1 (without price feed 2)
// and make sure that price feed 2 is not stored anymore. // and make sure that price feed 2 is not stored anymore.
@ -745,17 +801,17 @@ mod test {
// Check that price feed 2 does not exist anymore // Check that price feed 2 does not exist anymore
assert_eq!( assert_eq!(
get_price_feed_ids(&*state).await, (&*state).get_price_feed_ids().await,
vec![PriceIdentifier::new([100; 32]),].into_iter().collect() vec![PriceIdentifier::new([100; 32]),].into_iter().collect()
); );
assert!(get_price_feeds_with_update_data( assert!((&*state)
&*state, .get_price_feeds_with_update_data(
&[PriceIdentifier::new([200; 32])], &[PriceIdentifier::new([200; 32])],
RequestTime::Latest, RequestTime::Latest,
) )
.await .await
.is_err()); .is_err());
} }
#[tokio::test] #[tokio::test]
@ -791,13 +847,13 @@ mod test {
MockClock::advance(Duration::from_secs(1)); MockClock::advance(Duration::from_secs(1));
// Get the price feeds with update data // Get the price feeds with update data
let price_feeds_with_update_data = get_price_feeds_with_update_data( let price_feeds_with_update_data = (&*state)
&*state, .get_price_feeds_with_update_data(
&[PriceIdentifier::new([100; 32])], &[PriceIdentifier::new([100; 32])],
RequestTime::Latest, RequestTime::Latest,
) )
.await .await
.unwrap(); .unwrap();
// check received_at is correct // check received_at is correct
assert_eq!(price_feeds_with_update_data.price_feeds.len(), 1); assert_eq!(price_feeds_with_update_data.price_feeds.len(), 1);
@ -817,13 +873,13 @@ mod test {
.unwrap(); .unwrap();
// Check the state is ready // Check the state is ready
assert!(is_ready(&state).await); assert!((&state).is_ready().await);
// Advance the clock to make the prices stale // Advance the clock to make the prices stale
MockClock::advance_system_time(READINESS_STALENESS_THRESHOLD); MockClock::advance_system_time(READINESS_STALENESS_THRESHOLD);
MockClock::advance(READINESS_STALENESS_THRESHOLD); MockClock::advance(READINESS_STALENESS_THRESHOLD);
// Check the state is not ready // Check the state is not ready
assert!(!is_ready(&state).await); assert!(!(&state).is_ready().await);
} }
/// Test that the state retains the latest slots upon cache eviction. /// Test that the state retains the latest slots upon cache eviction.
@ -866,16 +922,16 @@ mod test {
// Check the last 100 slots are retained // Check the last 100 slots are retained
for slot in 900..1000 { for slot in 900..1000 {
let price_feeds_with_update_data = get_price_feeds_with_update_data( let price_feeds_with_update_data = (&*state)
&*state, .get_price_feeds_with_update_data(
&[ &[
PriceIdentifier::new([100; 32]), PriceIdentifier::new([100; 32]),
PriceIdentifier::new([200; 32]), PriceIdentifier::new([200; 32]),
], ],
RequestTime::FirstAfter(slot as i64), RequestTime::FirstAfter(slot as i64),
) )
.await .await
.unwrap(); .unwrap();
assert_eq!(price_feeds_with_update_data.price_feeds.len(), 2); assert_eq!(price_feeds_with_update_data.price_feeds.len(), 2);
assert_eq!(price_feeds_with_update_data.price_feeds[0].slot, Some(slot)); assert_eq!(price_feeds_with_update_data.price_feeds[0].slot, Some(slot));
assert_eq!(price_feeds_with_update_data.price_feeds[1].slot, Some(slot)); assert_eq!(price_feeds_with_update_data.price_feeds[1].slot, Some(slot));
@ -883,16 +939,16 @@ mod test {
// Check nothing else is retained // Check nothing else is retained
for slot in 0..900 { for slot in 0..900 {
assert!(get_price_feeds_with_update_data( assert!((&*state)
&*state, .get_price_feeds_with_update_data(
&[ &[
PriceIdentifier::new([100; 32]), PriceIdentifier::new([100; 32]),
PriceIdentifier::new([200; 32]) PriceIdentifier::new([200; 32])
], ],
RequestTime::FirstAfter(slot as i64), RequestTime::FirstAfter(slot as i64),
) )
.await .await
.is_err()); .is_err());
} }
} }
} }

View File

@ -1,14 +1,14 @@
//! This module communicates with Pyth Benchmarks, an API for historical price feeds and their updates. //! This module communicates with Pyth Benchmarks, an API for historical price feeds and their updates.
use { use {
super::State, super::{
crate::{
aggregate::{ aggregate::{
PriceFeedsWithUpdateData, PriceFeedsWithUpdateData,
UnixTimestamp, UnixTimestamp,
}, },
api::types::PriceUpdate, State,
}, },
crate::api::types::PriceUpdate,
anyhow::Result, anyhow::Result,
base64::{ base64::{
engine::general_purpose::STANDARD as base64_standard_engine, engine::general_purpose::STANDARD as base64_standard_engine,
@ -69,6 +69,7 @@ impl<'a> From<&'a State> for &'a BenchmarksState {
} }
} }
#[async_trait::async_trait]
pub trait Benchmarks { pub trait Benchmarks {
async fn get_verified_price_feeds( async fn get_verified_price_feeds(
&self, &self,
@ -77,6 +78,7 @@ pub trait Benchmarks {
) -> Result<PriceFeedsWithUpdateData>; ) -> Result<PriceFeedsWithUpdateData>;
} }
#[async_trait::async_trait]
impl<T> Benchmarks for T impl<T> Benchmarks for T
where where
for<'a> &'a T: Into<&'a BenchmarksState>, for<'a> &'a T: Into<&'a BenchmarksState>,

View File

@ -1,6 +1,6 @@
use { use {
super::State, super::State,
crate::aggregate::{ crate::state::aggregate::{
wormhole_merkle::WormholeMerkleState, wormhole_merkle::WormholeMerkleState,
AccumulatorMessages, AccumulatorMessages,
ProofSet, ProofSet,
@ -132,16 +132,10 @@ impl<'a> From<&'a State> for &'a CacheState {
} }
} }
#[async_trait::async_trait]
pub trait Cache { pub trait Cache {
async fn message_state_keys(&self) -> Vec<MessageStateKey>;
async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()>; async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()>;
async fn prune_removed_keys(&self, current_keys: HashSet<MessageStateKey>); async fn prune_removed_keys(&self, current_keys: HashSet<MessageStateKey>);
async fn fetch_message_states(
&self,
ids: Vec<FeedId>,
request_time: RequestTime,
filter: MessageStateFilter,
) -> Result<Vec<MessageState>>;
async fn store_accumulator_messages( async fn store_accumulator_messages(
&self, &self,
accumulator_messages: AccumulatorMessages, accumulator_messages: AccumulatorMessages,
@ -152,8 +146,16 @@ pub trait Cache {
wormhole_merkle_state: WormholeMerkleState, wormhole_merkle_state: WormholeMerkleState,
) -> Result<()>; ) -> Result<()>;
async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>>; async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>>;
async fn message_state_keys(&self) -> Vec<MessageStateKey>;
async fn fetch_message_states(
&self,
ids: Vec<FeedId>,
request_time: RequestTime,
filter: MessageStateFilter,
) -> Result<Vec<MessageState>>;
} }
#[async_trait::async_trait]
impl<T> Cache for T impl<T> Cache for T
where where
for<'a> &'a T: Into<&'a CacheState>, for<'a> &'a T: Into<&'a CacheState>,
@ -322,9 +324,9 @@ async fn retrieve_message_state(
mod test { mod test {
use { use {
super::*, super::*,
crate::{ crate::state::{
aggregate::wormhole_merkle::WormholeMerkleMessageProof, aggregate::wormhole_merkle::WormholeMerkleMessageProof,
state::test::setup_state, test::setup_state,
}, },
pyth_sdk::UnixTimestamp, pyth_sdk::UnixTimestamp,
pythnet_sdk::{ pythnet_sdk::{

View File

@ -13,10 +13,10 @@ COPY --chown=1000:1000 price_service/client/js price_service/client/js
COPY --chown=1000:1000 price_service/sdk/js price_service/sdk/js COPY --chown=1000:1000 price_service/sdk/js price_service/sdk/js
COPY --chown=1000:1000 target_chains/solana/sdk/js target_chains/solana/sdk/js COPY --chown=1000:1000 target_chains/solana/sdk/js target_chains/solana/sdk/js
COPY --chown=1000:1000 price_pusher price_pusher COPY --chown=1000:1000 apps/price_pusher apps/price_pusher
RUN npx lerna run build --scope="@pythnetwork/price-pusher" --include-dependencies RUN npx lerna run build --scope="@pythnetwork/price-pusher" --include-dependencies
WORKDIR /home/node/price_pusher WORKDIR /home/node/apps/price_pusher
ENTRYPOINT [ "npm", "run", "start" ] ENTRYPOINT [ "npm", "run", "start" ]

View File

@ -10,7 +10,7 @@ By default, Pyth does not automatically update the on-chain price every time the
instead, anyone can permissionlessly update the on-chain price prior to using it. instead, anyone can permissionlessly update the on-chain price prior to using it.
For more information please refer to [this document](https://docs.pyth.network/documentation/how-pyth-works). For more information please refer to [this document](https://docs.pyth.network/documentation/how-pyth-works).
Protocols integrating with can update the on-chain Pyth prices in two different ways. Protocols integrating with Pyth can update the on-chain Pyth prices in two different ways.
The first approach is on-demand updates: package a Pyth price update together with each transaction that depends on it. The first approach is on-demand updates: package a Pyth price update together with each transaction that depends on it.
On-demand updates minimize latency and are more gas efficient, as prices are only updated on-chain when they are needed. On-demand updates minimize latency and are more gas efficient, as prices are only updated on-chain when they are needed.
@ -87,7 +87,7 @@ npm install
npx lerna run build --scope @pythnetwork/price-pusher --include-dependencies npx lerna run build --scope @pythnetwork/price-pusher --include-dependencies
# Navigate to the price_pusher folder # Navigate to the price_pusher folder
cd price_pusher cd apps/price_pusher
# For EVM # For EVM
npm run start -- evm --endpoint wss://example-rpc.com \ npm run start -- evm --endpoint wss://example-rpc.com \
@ -145,6 +145,33 @@ npm run start -- near \
[--pushing-frequency 10] \ [--pushing-frequency 10] \
[--polling-frequency 5] [--polling-frequency 5]
# For Solana, using Jito (recommended)
npm run start -- solana \
--endpoint https://api.mainnet-beta.solana.com \
--keypair-file ./id.json \
--shard-id 1 \
--jito-endpoint mainnet.block-engine.jito.wtf \
--jito-keypair-file ./jito.json \
--jito-tip-lamports 100000 \
--jito-bundle-size 5 \
--price-config-file ./price-config.yaml \
--price-service-endpoint https://hermes.pyth.network/ \
--pyth-contract-address pythWSnswVUd12oZpeFP8e9CVaEqJg25g1Vtc2biRsT \
--pushing-frequency 30 \
[--polling-frequency 5]
# For Solana, using Solana RPC
npm run start -- solana \
--endpoint https://api.devnet.solana.com \
--keypair-file ./id.json \
--shard-id 1 \
--price-config-file ./price-config.yaml \
--price-service-endpoint https://hermes.pyth.network/ \
--pyth-contract-address pythWSnswVUd12oZpeFP8e9CVaEqJg25g1Vtc2biRsT \
--pushing-frequency 30 \
[--polling-frequency 5]
# Or, run the price pusher docker image instead of building from the source # Or, run the price pusher docker image instead of building from the source
docker run public.ecr.aws/pyth-network/xc-price-pusher:v<version> -- <above-arguments> docker run public.ecr.aws/pyth-network/xc-price-pusher:v<version> -- <above-arguments>

View File

@ -0,0 +1,13 @@
{
"endpoint": "https://api.mainnet-beta.solana.com",
"keypair-file": "./id.json",
"shard-id": 1,
"jito-endpoint": "mainnet.block-engine.jito.wtf",
"jito-keypair-file": "./jito.json",
"jito-tip-lamports": "100000",
"jito-bundle-size": "5",
"price-config-file": "./price-config.yaml",
"price-service-endpoint": "https://hermes.pyth.network/",
"pyth-contract-address": "pythWSnswVUd12oZpeFP8e9CVaEqJg25g1Vtc2biRsT",
"pushing-frequency": "30"
}

View File

@ -0,0 +1,9 @@
{
"endpoint": "https://api.devnet.solana.com",
"keypair-file": "./id.json",
"shard-id": 1,
"price-config-file": "./price-config.yaml",
"price-service-endpoint": "https://hermes.pyth.network/",
"pyth-contract-address": "pythWSnswVUd12oZpeFP8e9CVaEqJg25g1Vtc2biRsT",
"pushing-frequency": "30"
}

View File

@ -1,6 +1,6 @@
{ {
"name": "@pythnetwork/price-pusher", "name": "@pythnetwork/price-pusher",
"version": "6.5.0", "version": "6.7.2",
"description": "Pyth Price Pusher", "description": "Pyth Price Pusher",
"homepage": "https://pyth.network", "homepage": "https://pyth.network",
"main": "lib/index.js", "main": "lib/index.js",
@ -14,7 +14,7 @@
"repository": { "repository": {
"type": "git", "type": "git",
"url": "https://github.com/pyth-network/pyth-crosschain", "url": "https://github.com/pyth-network/pyth-crosschain",
"directory": "price_pusher" "directory": "apps/price_pusher"
}, },
"publishConfig": { "publishConfig": {
"access": "public" "access": "public"

View File

@ -52,6 +52,20 @@ export default {
required: false, required: false,
default: 5, default: 5,
} as Options, } as Options,
"gas-limit": {
description: "Gas limit for the transaction",
type: "number",
required: false,
} as Options,
"update-fee-multiplier": {
description:
"Multiplier for the fee to update the price. It is useful in networks " +
"such as Hedera where setting on-chain getUpdateFee as the transaction value " +
"won't work. Default to 1",
type: "number",
required: false,
default: 1,
} as Options,
...options.priceConfigFile, ...options.priceConfigFile,
...options.priceServiceEndpoint, ...options.priceServiceEndpoint,
...options.mnemonicFile, ...options.mnemonicFile,
@ -73,6 +87,8 @@ export default {
txSpeed, txSpeed,
overrideGasPriceMultiplier, overrideGasPriceMultiplier,
overrideGasPriceMultiplierCap, overrideGasPriceMultiplierCap,
gasLimit,
updateFeeMultiplier,
} = argv; } = argv;
const priceConfigs = readPriceConfigFile(priceConfigFile); const priceConfigs = readPriceConfigFile(priceConfigFile);
@ -119,6 +135,8 @@ export default {
pythContractFactory, pythContractFactory,
overrideGasPriceMultiplier, overrideGasPriceMultiplier,
overrideGasPriceMultiplierCap, overrideGasPriceMultiplierCap,
updateFeeMultiplier,
gasLimit,
gasStation gasStation
); );

Some files were not shown because too many files have changed in this diff Show More