solana-bankingstage-dashboard/postgres_connection.py

75 lines
2.9 KiB
Python

import pg8000
from dbutils.pooled_db import PooledDB
import ssl
import time
from os import environ
# keep default threadsafety
# If the underlying DB-API module is not thread-safe,
# thread locks will be used to ensure that the pooled_db connections are thread-safe.
# So you don't need to worry about that, but you should be careful to use dedicated
# connections whenever you change the database session or perform transactions spreading over more than one SQL command.
def _configure_sslcontext():
if environ.get('PGSSL', 'false') == 'true':
ssl_context = ssl.create_default_context()
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.check_hostname = False
ssl_context.load_verify_locations("ca.cer")
ssl_context.load_cert_chain("client.cer", keyfile="client-key.cer")
return ssl_context
else:
return None
# see https://webwareforpython.github.io/DBUtils/main.html#pooleddb-pooled-db
def _init_pool():
print("Setting up database connection pool...")
pool_size = int(environ.get('POOLED_DB_MAX_SIZE', '4'))
username = environ.get('PGUSER', 'mev_dashboard_query_user')
password = environ.get('PGPASSWORD')
assert password is not None, "PGPASSWORD environment variable must be set"
host = environ.get('PGHOST', 'localhost')
port = environ.get('PGPORT', '5432')
database = environ.get('PGDATABASE', 'mangolana')
ssl_context = _configure_sslcontext()
if ssl_context is not None:
print("... use SSL for database connection")
application_name = "bankingstage-dashboard"
timeout = 10
# note: for some unknown reason, database sees maxconnections+1 connections
the_pool = PooledDB(pg8000, maxconnections=pool_size, blocking=True, maxusage=100,
database=database, user=username, password=password, host=host, port=port,
application_name=application_name, timeout=timeout, ssl_context=ssl_context)
print("Initialized database connection pool with size ", pool_size)
return the_pool
pool = _init_pool()
def query(statement, args=[]):
start = time.time()
elapsed_connect = time.time() - start
with pool.connection() as db:
with db.cursor() as cursor:
try:
cursor.execute(statement, args=args)
elapsed_total_ms = (time.time() - start) * 1000
keys = [k[0] for k in cursor.description]
maprows = [dict(zip(keys, row)) for row in cursor]
except Exception as ex:
print("Exception executing statement:", ex, statement)
raise ex
if elapsed_total_ms > 400:
print("SLOW Database Query took", "%.2f" % elapsed_total_ms, "ms", "(conn=" + "%.2f" % elapsed_connect + "ms)")
else:
print("Database Query took", "%.2f" % elapsed_total_ms, "ms", "(conn=" + "%.2f" % elapsed_connect + "ms)")
return maprows