Merge pull request #17 from blockworks-foundation/changing_schema_and_adding_accounts_data

updated UI to new schema
This commit is contained in:
galactus 2023-12-12 09:59:52 +01:00 committed by GitHub
commit 35669eaf41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 260 additions and 239 deletions

View File

@ -64,3 +64,15 @@ Open Firefox Browser and navigate to ...
| Hard Limit of HTTP Requests | fly.io | hard_limit | fly.toml |
| Python HTTP Server | gunicorn | --workers, --threads | Dockerfile |
### Data Model
Conventions:
| Table Name | Alias |
|------------------|---------|
| transaction_slot | tx_slot |
| accounts_map_blocks | amb |
| transaction_infos | txi |
| blocks | blocks |
| accounts_map_transaction | amt |
| transaction_slot | tx_slot |
| | |

44
app.py
View File

@ -8,6 +8,7 @@ import recent_blocks_database
import block_details_database
import config
import locale
from datetime import datetime
#
# MAIN
@ -46,7 +47,7 @@ def dashboard():
def tx_errors():
this_config = config.get_config()
start = time.time()
maprows = list(transaction_database.run_query())
maprows = list(transaction_database.run_query(transaction_row_limit=50))
elapsed = time.time() - start
if elapsed > .5:
print("transaction_database.RunQuery() took", elapsed, "seconds")
@ -89,16 +90,24 @@ def is_slot_number(raw_string):
return re.fullmatch("[0-9,]+", raw_string) is not None
def is_block_hash(raw_string):
# regex is not perfect - feel free to improve
# used for blockhash AND account pubkey
def is_b58_44(raw_string):
return re.fullmatch("[0-9a-zA-Z]{43,44}", raw_string) is not None
def is_tx_sig(raw_string):
# regex is not perfect - feel free to improve
if is_block_hash(raw_string):
if is_b58_44(raw_string):
return False
return re.fullmatch("[0-9a-zA-Z]{64,100}", raw_string) is not None
return re.fullmatch("[0-9a-zA-Z]{86,88}", raw_string) is not None
# account address
# if NOT blockhash
def is_account_key(raw_string):
return re.fullmatch("[0-9a-zA-Z]{32,44}", raw_string) is not None
@webapp.route('/search', methods=["GET", "POST"])
@ -121,18 +130,28 @@ def search():
return render_template('_blockslist.html', config=this_config, blocks=maprows)
else:
return render_template('_search_noresult.html', config=this_config)
elif is_block_hash(search_string):
is_blockhash = block_details_database.is_matching_blockhash(search_string)
if is_blockhash:
print("blockhash search=", search_string)
maprows = list(recent_blocks_database.find_block_by_blockhash(search_string))
if len(maprows):
return render_template('_blockslist.html', config=this_config, blocks=maprows)
else:
return render_template('_search_noresult.html', config=this_config)
elif not is_blockhash and is_b58_44(search_string):
print("account address search=", search_string)
(maprows, is_limit_exceeded) = list(transaction_database.query_transactions_by_address(search_string))
if len(maprows):
return render_template('_txlist.html', config=this_config, transactions=maprows, limit_exceeded=is_limit_exceeded)
else:
return render_template('_search_noresult.html', config=this_config)
elif is_tx_sig(search_string):
print("txsig search=", search_string)
maprows = list(transaction_database.find_transaction_by_sig(search_string))
if len(maprows):
return render_template('_txlist.html', config=this_config, transactions=maprows)
return render_template('_txlist.html', config=this_config, transactions=maprows, limit_exceeded=False)
else:
return render_template('_search_noresult.html', config=this_config)
else:
@ -204,3 +223,14 @@ def mapcount_filter(number: int):
print("FIELD_ERROR in template filter")
return "FIELD_ERROR"
@webapp.template_filter('timestamp')
def timestamp_filter(dt: datetime):
if dt is None:
return None
else:
try:
return dt.strftime('%a %d %H:%M:%S.%f')
except TypeError:
print("FIELD_ERROR in template filter")
return "FIELD_ERROR"

View File

@ -12,13 +12,16 @@ def find_block_by_slotnumber(slot_number: int):
leader_identity,
processed_transactions,
successful_transactions,
banking_stage_errors,
(
SELECT
count(tx_slot.error_code)
FROM banking_stage_results_2.transaction_slot tx_slot
WHERE tx_slot.slot=blocks.slot
) AS banking_stage_errors,
total_cu_used,
total_cu_requested,
heavily_writelocked_accounts,
heavily_readlocked_accounts,
supp_infos
FROM banking_stage_results.blocks
FROM banking_stage_results_2.blocks
-- this critera uses index idx_blocks_slot
WHERE slot = %s
) AS data
@ -27,27 +30,53 @@ def find_block_by_slotnumber(slot_number: int):
assert len(maprows) <= 1, "Slot is primary key - find zero or one"
for row in maprows:
# format see BankingStageErrorsTrackingSidecar -> block_info.rs
# parse (k:GubTBrbgk9JwkwX1FkXvsrF1UC2AP7iTgg8SGtgH14QE, cu_req:600000, cu_con:2243126)
slot = row["slot"]
parsed_accounts = json.loads(row["heavily_writelocked_accounts"])
row['supp_infos'] =json.loads(row['supp_infos'])
parsed_accounts.sort(key=lambda acc: int(acc['cu_consumed']), reverse=True)
row["heavily_writelocked_accounts_parsed"] = parsed_accounts
# TODO need new parser
row['supp_infos'] = json.loads(row['supp_infos'])
parsed_accounts = json.loads(row["heavily_readlocked_accounts"])
parsed_accounts.sort(key=lambda acc: int(acc['cu_consumed']), reverse=True)
row["heavily_readlocked_accounts_parsed"] = parsed_accounts
# note: sort order is undefined
accountinfos = (
postgres_connection.query(
"""
SELECT
amb.*,
acc.account_key
FROM banking_stage_results_2.accounts_map_blocks amb
INNER JOIN banking_stage_results_2.accounts acc ON acc.acc_id=amb.acc_id
WHERE slot = %s
""", args=[slot])
)
account_info_expanded = []
for account_info in accountinfos:
prio_fee_data = json.loads(account_info['prioritization_fees_info'])
info = {
'slot': account_info['slot'],
'key': account_info['account_key'],
'is_write_locked': account_info['is_write_locked'],
'cu_requested': account_info['total_cu_requested'],
'cu_consumed': account_info['total_cu_consumed'],
'min_pf': prio_fee_data['min'],
'median_pf': prio_fee_data['med'],
'max_pf': prio_fee_data['max']
}
account_info_expanded.append(info)
account_info_expanded.sort(key=lambda acc: int(acc['cu_consumed']), reverse=True)
row["heavily_writelocked_accounts_parsed"] = [acc for acc in account_info_expanded if acc['is_write_locked'] is True]
row["heavily_readlocked_accounts_parsed"] = [acc for acc in account_info_expanded if acc['is_write_locked'] is False]
return maprows
def is_matching_blockhash(block_hash):
maprows = postgres_connection.query(
"""
SELECT 1 FROM banking_stage_results_2.blocks
WHERE block_hash = %s
""", [block_hash])
return len(maprows) > 0
# parse (k:GubTBrbgk9JwkwX1FkXvsrF1UC2AP7iTgg8SGtgH14QE, cu_req:600000, cu_con:2243126)
# def parse_accounts(acc):
# groups = re.match(r"\((k:)(?P<k>[a-zA-Z0-9]+)(, cu_req:)(?P<cu_req>[0-9]+)(, cu_con:)(?P<cu_con>[0-9]+)\)", acc)
# return (groups.group('k'), groups.group('cu_req'), groups.group('cu_con'))
def main():
find_block_by_slotnumber(226352855)

View File

@ -62,8 +62,8 @@ def query(statement, args=[]):
keys = [k[0] for k in cursor.description]
maprows = [dict(zip(keys, row)) for row in cursor]
except Exception as ex:
print("Exception executing query:", ex)
return []
print("Exception executing statement:", ex, statement)
raise ex
if elapsed_total > .2:
print("Database Query took", elapsed_total, "secs", "(", elapsed_connect, ")")

View File

@ -13,7 +13,6 @@ def format_width_percentage(x):
def calc_figures(row):
successful_transactions = row['successful_transactions']
processed_transactions = row['processed_transactions']
banking_stage_errors = row['banking_stage_errors'] or 0
txerrors = processed_transactions - successful_transactions
row['txerrors'] = txerrors
@ -52,7 +51,7 @@ def calc_bars(row):
row['hide_bar'] = True
def run_query(to_slot=None):
def run_query(to_slot=None, filter_slot=None, filter_blockhash=None):
maprows = postgres_connection.query(
"""
SELECT * FROM (
@ -60,95 +59,51 @@ def run_query(to_slot=None):
slot,
processed_transactions,
successful_transactions,
banking_stage_errors,
(
SELECT
count(tx_slot.error_code)
FROM banking_stage_results_2.transaction_slot tx_slot
WHERE tx_slot.slot=blocks.slot
) AS banking_stage_errors,
total_cu_used,
total_cu_requested,
supp_infos
FROM banking_stage_results.blocks
-- this critera uses index idx_blocks_slot_errors
WHERE
-- short circuit if true
(%s or slot <= %s)
FROM banking_stage_results_2.blocks
WHERE true
AND (%s or slot <= %s)
AND (%s or slot = %s)
AND (%s or block_hash = %s)
ORDER BY slot DESC
LIMIT 100
) AS data
""",
[to_slot is None, to_slot])
# print some samples
# for row in maprows[:3]:
# print(row)
# print("...")
[
to_slot is None, to_slot,
filter_slot is None, filter_slot,
filter_blockhash is None, filter_blockhash,
])
for row in maprows:
fixup_row(row)
calc_bars(row)
calc_figures(row)
row["prioritization_fees"] = json.loads(row['supp_infos'])
return maprows
def find_block_by_slotnumber(slot_number: int):
maprows = postgres_connection.query(
"""
SELECT * FROM (
SELECT
slot,
processed_transactions,
successful_transactions,
banking_stage_errors,
total_cu_used,
total_cu_requested,
supp_infos
FROM banking_stage_results.blocks
-- this critera uses index idx_blocks_slot
WHERE slot = %s
) AS data
""", args=[slot_number])
maprows = run_query(filter_slot=slot_number)
assert len(maprows) <= 1, "Slot is primary key - find zero or one"
for row in maprows:
fixup_row(row)
calc_bars(row)
calc_figures(row)
return maprows
def fixup_row(row):
row['banking_stage_errors'] = row['banking_stage_errors'] or 0
row['prioritization_fees'] = json.loads(row['supp_infos'])
def find_block_by_blockhash(block_hash: str):
maprows = postgres_connection.query(
"""
SELECT * FROM (
SELECT
slot,
processed_transactions,
successful_transactions,
banking_stage_errors,
total_cu_used,
total_cu_requested,
supp_infos
FROM banking_stage_results.blocks
-- uses index on primary key
WHERE block_hash = %s
) AS data
""", args=[block_hash])
maprows = run_query(filter_blockhash=block_hash)
assert len(maprows) <= 1, "Block hash is unique - find zero or one"
for row in maprows:
fixup_row(row)
calc_bars(row)
calc_figures(row)
print("found ", maprows, block_hash)
return maprows

View File

@ -11,19 +11,24 @@
</tr>
</thead>
<tbody class="list">
{% if limit_exceeded %}
<tr>
<td colspan="7" class="text-warning">Result was truncated</td>
</tr>
{% endif %}
{% for tx in transactions %}
<tr>
<td>{{ tx.pos }}</td>
{% if tx.is_executed %}
{% if tx.is_successful %}
<td><span class="badge bg-success-soft">Included</span></td>
{% elif not tx.is_executed %}
{% else %}
<td><span class="badge bg-warning-soft">Excluded</span></td>
{% endif %}
<td>
<div class="align-items-center">
<span class="font-monospace text-muted">
{{ tx.timestamp_formatted }}
{{ tx.utc_timestamp or '--' }}
</span>
</div>
</td>
@ -46,6 +51,13 @@
</td>
</tr>
{% endfor %}
{% if not tx.errors_array %}
<tr>
<td>
<span class="font-monospace text-muted">--</span>
</td>
</tr>
{% endif %}
</table>
</div>
</td>

View File

@ -67,7 +67,7 @@
</tr>
<tr>
<td class="w-100">UTC timestamp</td>
<td class="text-lg-end font-monospace"><span>{{ transaction.timestamp_formatted }}</span></td>
<td class="text-lg-end font-monospace"><span>{{ transaction.utc_timestamp | timestamp }}</span></td>
</tr>
</tbody>
</table>

View File

@ -1,40 +1,50 @@
import postgres_connection
import json
def run_query():
def run_query(transaction_row_limit=None, filter_txsig=None, filter_account_address=None):
maprows = postgres_connection.query(
"""
WITH tx_aggregated AS (
SELECT * FROM (
SELECT
signature as sig,
min(first_notification_slot) as min_slot,
ARRAY_AGG(errors) as all_errors
FROM banking_stage_results.transaction_infos
signature,
(
SELECT ARRAY_AGG(json_build_object('error', err.error_text, 'count', count)::text)
FROM banking_stage_results_2.transaction_slot tx_slot
INNER JOIN banking_stage_results_2.errors err ON err.error_code=tx_slot.error_code
WHERE tx_slot.transaction_id=txi.transaction_id
) AS all_errors,
is_successful,
processed_slot,
(
SELECT min(slot)
FROM banking_stage_results_2.transaction_slot tx_slot
WHERE tx_slot.transaction_id=txi.transaction_id
) AS first_notification_slot,
cu_requested,
prioritization_fees,
(
SELECT min(utc_timestamp)
FROM banking_stage_results_2.transaction_slot tx_slot
WHERE tx_slot.transaction_id=txi.transaction_id
) AS utc_timestamp
FROM banking_stage_results_2.transaction_infos txi
INNER JOIN banking_stage_results_2.transactions txs ON txs.transaction_id=txi.transaction_id
WHERE true
GROUP BY signature
ORDER BY min(utc_timestamp) DESC
LIMIT 50
)
SELECT
signature,
tx_aggregated.all_errors,
is_executed,
is_confirmed,
first_notification_slot,
cu_requested,
prioritization_fees,
utc_timestamp,
-- e.g. "OCT 17 12:29:17.5127"
to_char(utc_timestamp, 'MON DD HH24:MI:SS.MS') as timestamp_formatted
FROM banking_stage_results.transaction_infos txi
INNER JOIN tx_aggregated ON tx_aggregated.sig=txi.signature AND tx_aggregated.min_slot=txi.first_notification_slot
""")
# print some samples
# for row in maprows[:3]:
# print(row)
# print("...")
AND (%s or signature = %s)
AND (%s or txi.transaction_id in (
SELECT transaction_id
FROM banking_stage_results_2.accounts_map_transaction amt
INNER JOIN banking_stage_results_2.accounts acc ON acc.acc_id=amt.acc_id
WHERE account_key = %s
))
) AS data
ORDER BY processed_slot, utc_timestamp, signature DESC
LIMIT %s
""", [
filter_txsig is None, filter_txsig,
filter_account_address is None, filter_account_address,
transaction_row_limit or 50,
])
for index, row in enumerate(maprows):
row['pos'] = index + 1
@ -44,45 +54,32 @@ def run_query():
def find_transaction_by_sig(tx_sig: str):
maprows = postgres_connection.query(
"""
WITH tx_aggregated AS (
SELECT
signature as sig,
min(first_notification_slot) as min_slot,
ARRAY_AGG(errors) as all_errors
FROM banking_stage_results.transaction_infos
WHERE signature = %s
GROUP BY signature
)
SELECT
signature,
tx_aggregated.all_errors,
is_executed,
is_confirmed,
first_notification_slot,
cu_requested,
prioritization_fees,
utc_timestamp,
-- e.g. "OCT 17 12:29:17.5127"
to_char(utc_timestamp, 'MON DD HH24:MI:SS.MS') as timestamp_formatted
FROM banking_stage_results.transaction_infos txi
INNER JOIN tx_aggregated ON tx_aggregated.sig=txi.signature AND tx_aggregated.min_slot=txi.first_notification_slot
""", args=[tx_sig])
maprows = run_query(transaction_row_limit=10, filter_txsig=tx_sig)
assert len(maprows) <= 1, "Tx Sig is primary key - find zero or one"
assert len(maprows) <= 1, "Signature is primary key - find zero or one"
for row in maprows:
map_jsons_in_row(row)
return maprows
# return (rows, is_limit_exceeded)
def query_transactions_by_address(account_key: str) -> (list, bool):
maprows = run_query(transaction_row_limit=501, filter_account_address=account_key)
if len(maprows) == 501:
print("limit exceeded while searching for transactions by address")
return maprows, True
return maprows, False
def map_jsons_in_row(row):
errors = []
# flatmap postgres array of json strings which contain array (of errors, usually one)
if row["all_errors"] is None:
row["all_errors"] = []
return
for errors_json in row["all_errors"]:
for error in json.loads(errors_json):
errors.append(error)
# {"{\"error_text\" : \"TransactionError::AccountInUse\", \"count\" : 1}"}
errors.append(json.loads(errors_json))
row["errors_array"] = errors
def main():

View File

@ -8,103 +8,87 @@ def find_transaction_details_by_sig(tx_sig: str):
# transaction table primary key is used
maprows = postgres_connection.query(
"""
WITH tx_aggregated AS (
SELECT
signature as sig,
min(first_notification_slot) as min_slot,
ARRAY_AGG(errors) as all_errors
FROM banking_stage_results.transaction_infos
WHERE signature = %s
GROUP BY signature
)
SELECT
txi.transaction_id,
signature,
tx_aggregated.all_errors,
is_executed,
is_confirmed,
first_notification_slot,
is_successful,
processed_slot,
(
SELECT min(slot)
FROM banking_stage_results_2.transaction_slot tx_slot
WHERE tx_slot.transaction_id=txi.transaction_id
) AS first_notification_slot,
cu_requested,
prioritization_fees,
processed_slot,
to_char(utc_timestamp, 'MON DD HH24:MI:SS.MS') as timestamp_formatted,
accounts_used
FROM banking_stage_results.transaction_infos txi
INNER JOIN tx_aggregated ON tx_aggregated.sig=txi.signature AND tx_aggregated.min_slot=txi.first_notification_slot
(
SELECT min(utc_timestamp)
FROM banking_stage_results_2.transaction_slot tx_slot
WHERE tx_slot.transaction_id=txi.transaction_id
) AS utc_timestamp
FROM banking_stage_results_2.transaction_infos txi
INNER JOIN banking_stage_results_2.transactions txs ON txs.transaction_id=txi.transaction_id
WHERE signature=%s
""", args=[tx_sig])
assert len(maprows) <= 1, "Tx Sig is primary key - find zero or one"
for row in maprows:
transaction_database.map_jsons_in_row(row)
accounts = json.loads(row['accounts_used'])
row['writelocked_accounts'] = list(filter(lambda acc : acc['writable'], accounts))
row['readlocked_accounts'] = list(filter(lambda acc : not acc['writable'], accounts))
relevent_slots_dict = {row['first_notification_slot']}
for error in row['errors_array']:
relevent_slots_dict.add(error['slot'])
relevant_slots = list(relevent_slots_dict)
row['relevant_slots'] = relevant_slots
if maprows:
row = maprows[0]
blockrows = postgres_connection.query(
# {'transaction_id': 1039639, 'slot': 234765028, 'error': 34, 'count': 1, 'utc_timestamp': datetime.datetime(2023, 12, 8, 18, 29, 23, 861619)}
tx_slots = postgres_connection.query(
"""
SELECT * FROM (
SELECT
tx_slot.slot,
err.error_text
FROM banking_stage_results_2.transaction_slot tx_slot
INNER JOIN banking_stage_results_2.errors err ON err.error_code=tx_slot.error_code
WHERE transaction_id=%s
""", args=[row["transaction_id"]])
# ordered by slots ascending
relevant_slots = [txslot["slot"] for txslot in tx_slots]
row["relevant_slots"] = relevant_slots
# note: sort order is undefined
accountinfos_per_slot = (
invert_by_slot(
postgres_connection.query(
"""
SELECT
slot,
heavily_writelocked_accounts,
heavily_readlocked_accounts
FROM banking_stage_results.blocks
-- see pg8000 docs for unnest hack
amb.*,
acc.account_key
FROM banking_stage_results_2.accounts_map_blocks amb
INNER JOIN banking_stage_results_2.accounts acc ON acc.acc_id=amb.acc_id
WHERE slot IN (SELECT unnest(CAST(%s as bigint[])))
) AS data
""", args=[relevant_slots])
""", args=[relevant_slots]))
)
wai = []
rai = []
for block_data in blockrows:
hwl = json.loads(block_data['heavily_writelocked_accounts'])
hrl = json.loads(block_data['heavily_readlocked_accounts'])
for writed in row['writelocked_accounts']:
info = {'slot' : block_data['slot'], 'key' : writed['key'] }
acc = list(filter(lambda acc_: acc_['key'] == writed['key'], hwl))
if len(acc) > 1:
print("WARNING: multiple accounts with same key in same block")
if len(acc) > 0:
acc = defaultdict(lambda: 0, acc[0])
info['cu_requested'] = acc['cu_requested']
info['cu_consumed'] = acc['cu_consumed']
info['min_pf'] = acc['min_pf']
info['median_pf'] = acc['median_pf']
info['max_pf'] = acc['max_pf']
else:
info['cu_requested'] = 0
info['cu_consumed'] = 0
info['min_pf'] = 0
info['median_pf'] = 0
info['max_pf'] = 0
wai.append(info)
write_lock_info = dict()
read_lock_info = dict()
for relevant_slot in set(relevant_slots):
accountinfos = accountinfos_per_slot.get(relevant_slot, [])
for readed in row['readlocked_accounts']:
info = {'slot' : block_data['slot'], 'key' : readed['key'] }
acc = list(filter(lambda x: x['key'] == readed['key'],hrl))
if len(acc) > 1:
print("WARNING: multiple accounts with same key in same block")
if len(acc) > 0:
acc = defaultdict(lambda: 0, acc[0])
info['cu_requested'] = acc['cu_requested']
info['cu_consumed'] = acc['cu_consumed']
info['min_pf'] = acc['min_pf']
info['median_pf'] = acc['median_pf']
info['max_pf'] = acc['max_pf']
else:
info['cu_requested'] = 0
info['cu_consumed'] = 0
info['min_pf'] = 0
info['median_pf'] = 0
info['max_pf'] = 0
rai.append(info)
row['write_lock_info'] = invert_by_slot(wai)
row['read_lock_info'] = invert_by_slot(rai)
account_info_expanded = []
for account_info in accountinfos:
prio_fee_data = json.loads(account_info['prioritization_fees_info'])
info = {
'slot': account_info['slot'],
'key': account_info['account_key'],
'is_write_locked': account_info['is_write_locked'],
'cu_requested': account_info['total_cu_requested'],
'cu_consumed': account_info['total_cu_consumed'],
'min_pf': prio_fee_data['min'],
'median_pf': prio_fee_data['med'],
'max_pf': prio_fee_data['max']
}
account_info_expanded.append(info)
account_info_expanded.sort(key=lambda acc: int(acc['cu_consumed']), reverse=True)
write_lock_info[relevant_slot] = [acc for acc in account_info_expanded if acc['is_write_locked'] is True]
read_lock_info[relevant_slot] = [acc for acc in account_info_expanded if acc['is_write_locked'] is False]
row["write_lock_info"] = write_lock_info
row["read_lock_info"] = read_lock_info
return maprows
@ -116,3 +100,5 @@ def invert_by_slot(rows):
for row in rows:
inv_indx[row["slot"]].append(row)
return inv_indx