solana-bankingstage-dashboard/transaction_database.py

115 lines
3.8 KiB
Python

import postgres_connection
import json
def run_query(transaction_row_limit=50, filter_txsig=None):
maprows = postgres_connection.query(
"""
SELECT * FROM (
SELECT
signature,
( SELECT count(distinct slot) FROM banking_stage_results_2.transaction_slot WHERE transaction_id=tx_slot.transaction_id ) AS num_relative_slots,
(
SELECT ARRAY_AGG(json_build_object('slot', tx_slot.slot, 'error', err.error_text, 'count', count)::text)
FROM banking_stage_results_2.errors err
WHERE err.error_code=tx_slot.error_code
) AS all_errors,
( txi is not null ) AS was_included_in_block,
txi.cu_requested,
txi.prioritization_fees,
utc_timestamp,
tx_slot.transaction_id
FROM banking_stage_results_2.transaction_slot tx_slot
INNER JOIN banking_stage_results_2.transactions txs ON txs.transaction_id=tx_slot.transaction_id
LEFT JOIN banking_stage_results_2.transaction_infos txi ON txi.transaction_id=tx_slot.transaction_id
WHERE true
AND (%s or signature = %s)
) AS data
-- transaction_id is required as tie breaker
ORDER BY utc_timestamp DESC, transaction_id DESC
LIMIT %s
""", [
filter_txsig is None, filter_txsig,
transaction_row_limit,
])
for index, row in enumerate(maprows):
row['pos'] = index + 1
map_jsons_in_row(row)
return maprows
def query_transactions_by_address(account_key: str, transaction_row_limit=100):
maprows = postgres_connection.query(
"""
SELECT * FROM (
SELECT
amt.transaction_id,
signature,
( txi is not null ) AS was_included_in_block,
txi.cu_requested,
txi.prioritization_fees,
tx_slot.min_utc_timestamp AS utc_timestamp
FROM banking_stage_results_2.accounts_map_transaction amt
INNER JOIN banking_stage_results_2.accounts acc ON acc.acc_id=amt.acc_id
INNER JOIN banking_stage_results_2.transactions txs ON txs.transaction_id=amt.transaction_id
LEFT JOIN banking_stage_results_2.transaction_infos txi ON txi.transaction_id=amt.transaction_id
LEFT JOIN (
SELECT
transaction_id,
min(utc_timestamp) AS min_utc_timestamp
FROM banking_stage_results_2.transaction_slot tx_slot
GROUP BY transaction_id
) tx_slot ON tx_slot.transaction_id=amt.transaction_id
WHERE account_key = %s
) AS data
ORDER BY transaction_id DESC
LIMIT %s
""", [
account_key,
transaction_row_limit,
])
for index, row in enumerate(maprows):
row['pos'] = index + 1
return maprows
# may return multiple rows
def search_transaction_by_sig(tx_sig: str):
maprows = run_query(transaction_row_limit=10, filter_txsig=tx_sig)
return maprows
# return (rows, is_limit_exceeded)
def search_transactions_by_address(account_key: str) -> (list, bool):
maprows = query_transactions_by_address(transaction_row_limit=101, account_key=account_key)
if len(maprows) == 101:
print("limit exceeded while searching for transactions by address")
return maprows, True
return maprows, False
def map_jsons_in_row(row):
errors = []
if row["all_errors"] is None:
row["all_errors"] = []
return
for errors_json in row["all_errors"]:
errors.append(json.loads(errors_json))
row["errors_array"] = errors
def main():
run_query()
if __name__=="__main__":
main()