zcash-graphs/zcash_graphs/analysis/analyze.py

161 lines
6.9 KiB
Python

# Copyright (c) 2022 The Zcash developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or https://www.opensource.org/licenses/mit-license.php .
"""Simple Transaction Analysis
This contains a class, `Analysis`, for defining analyses of the blocks and
transactions on the blockchain. It also contains a class `Analyzer` with a
method `analyze_blocks`, which handles applying multiple analyses simultaneously
over some common range of blocks.
"""
import datetime
import itertools
import math
import progressbar
from slickrpc.rpc import Proxy
class Analysis:
"""
An analysis collects a single aggregated data structure from the blockchain.
If you had a block and a single tx from that block, you could simply
`my_analysis.aggregate(my_analysis.extract(block, tx))` to generate the stats
for that analysis. However, since we generally want to aggregate across many
transactions in many blocks and also because we usually want to collect
multiple statistics at once (because re-fetching blocks and tx is slow),
`extract` and `aggregate are separated out. See `analyze_blocks` for how to
take advantage of this structure.
"""
def __init__(self, name, tx_filter, bucketers, extractor, cache = ((), lambda c, _: c), preCache = 0):
"""It takes various functions to apply to the transactions therein. The functions are typed as follows:
tx_filter :: cache -> Block -> Tx -> Boolean
bucketers :: [ ...,
(cache -> Block -> Tx -> k_n-2, [(k_n-1, a)] -> b),
(cache -> Block -> Tx -> k_n-1, [(k_n, a)] -> b),
(cache -> Block -> Tx -> k_n, [v] -> a)
]
extractor :: cache -> Block -> Tx -> v
cache :: (cache, cache -> Block -> cache)
preCache = Natural
`tx_filter` decides whether the given transaction should be included in the
result,
`extractor` reduces each transaction to the parts we care about in the
results,
`bucketers` is a list of pairs of functions -- the first of each pair
produces a key for bucketing the results and the second is how
to accumulate the values in that bucket. The list allows us to
create buckets of buckets.
`cache`, if provided, is a tuple of an initial cache value and a function to
update it so that later transactions can look at information from
previous blocks.
`preCache` is how many blocks before the start of our range to start
caching. This is generally a _minimum_, don't be suprised if the
cache is updated from some much earlier point. Also, it may be
truncated if there aren't enough blocks between the beginning of
the chain and and the start of the range.
If no bucketers are provided, this returns a list of all the extracted data
in a list, one for each transaction. If there are bucketers, it returns a
map, with the keys from the first bucketer in the list and the values from
the first accumulator in the list.
"""
self.name = name
self.__filter = tx_filter
self.__bucketers = bucketers
self.__extractor = extractor
(self.__cache, self.__cacheUpdater) = cache
self.preCache = preCache
self.__lastCachedBlock = 0
def updateCache(self, block):
"""
This is exposed in order to handle the "precache", where we need to
build up the cache for blocks before the blocks we actually care to have
in our results.
"""
if block['height'] > self.__lastCachedBlock:
self.__cache = self.__cacheUpdater(self.__cache, block)
self.__lastCachedBlock = block['height']
def extract(self, block, tx):
"""
Extracts all the data from a given transaction (and its block) needed to
compute the statistics for this analysis.
TODO: Allow a bucketer to return multiple keys. This hopefully allows
things like sub-transaction extraction. E.g., looking at the sizes
of all vouts by day, without caring which ones are in the same tx
TODO: Distinguish between streamable and non-streamable analyses. The
difference is that a streamable analysis has an outermost bucketer
where duplicate keys are adjacent (much like POSIX `uniq`).
"""
self.updateCache(block)
if self.__filter(self.__cache, block, tx):
value = self.__extractor(self.__cache, block, tx)
keys = [x[0](self.__cache, block, tx) for x in self.__bucketers]
return [(keys, value)]
else:
return []
def aggregate(self, kvs):
"""
Given a `[([k_0, k_1, ..., k_n-1], v)]` (where `n` is the length of the
bucketer list provided at initialization and `k_*` are the results of
each bucketer), this groups and accumulates the results, returning their
final form.
"""
kvs.sort(key=lambda x: x[0])
return self.__group(kvs, [x[1] for x in self.__bucketers])
def __group(self, kvs, accumulators):
if accumulators:
buck = []
accum, *remaining_accum = accumulators
for k, g in itertools.groupby(kvs, lambda x: x[0].pop(0)):
buck.append((k, accum(self.__group(list(g), remaining_accum))))
return buck
else:
return [x[1] for x in kvs]
class Analyzer:
def __init__(self, node_url):
self.node = Proxy(node_url)
def analyze_blocks(self, block_range, analyses):
"""
This function executes multiple analyses over a common range of blocks,
returning results keyed by the name of the analysis.
"""
current_height = self.node.getblockchaininfo()['estimatedheight']
bounded_range = range(
max(0, min(block_range[0], current_height)),
max(0, min(block_range[1], current_height))
)
longest_precache = max([x.preCache for x in analyses])
data_start = bounded_range[0]
bar = progressbar.ProgressBar(widgets=['Building Cache ', progressbar.Bar()])
for i in bar(range(max(0, data_start - longest_precache), data_start)):
[x.updateCache(self.node.getblock(str(i), 2)) for x in analyses]
bucketses = [(x, []) for x in analyses]
bar = progressbar.ProgressBar(widgets=['Processing Blocks', progressbar.Bar()])
for block_height in bar(block_range):
block = self.node.getblock(str(block_height), 2)
for tx in block['tx']:
for analysis in analyses:
dict(bucketses)[analysis].extend(analysis.extract(block, tx))
result = []
bar = progressbar.ProgressBar(widgets=['Running Analyses ', progressbar.Bar()])
for analysis in bar(analyses):
result.append((analysis.name, analysis.aggregate(dict(bucketses)[analysis])))
return result