eth-json-rpc-filters/index.js

223 lines
6.5 KiB
JavaScript

const Mutex = require('await-semaphore').Mutex
const EthQuery = require('ethjs-query')
const createAsyncMiddleware = require('json-rpc-engine/src/createAsyncMiddleware')
const createJsonRpcMiddleware = require('eth-json-rpc-middleware/scaffold')
const LogFilter = require('./log-filter.js')
const BlockFilter = require('./block-filter.js')
const TxFilter = require('./tx-filter.js')
const { intToHex, hexToInt } = require('./hexUtils')
module.exports = createEthFilterMiddleware
function createEthFilterMiddleware({ blockTracker, provider }) {
// ethQuery for log lookups
const ethQuery = new EthQuery(provider)
// create filter collection
let filterIndex = 0
let filters = {}
// create update mutex
const mutex = new Mutex()
const waitForFree = mutexMiddlewareWrapper({ mutex })
const middleware = createJsonRpcMiddleware({
// install filters
eth_newFilter: waitForFree(toFilterCreationMiddleware(newLogFilter)),
eth_newBlockFilter: waitForFree(toFilterCreationMiddleware(newBlockFilter)),
eth_newPendingTransactionFilter: waitForFree(toFilterCreationMiddleware(newPendingTransactionFilter)),
// uninstall filters
eth_uninstallFilter: waitForFree(toAsyncRpcMiddleware(uninstallFilterHandler)),
// checking filter changes
eth_getFilterChanges: waitForFree(toAsyncRpcMiddleware(getFilterChanges)),
eth_getFilterLogs: waitForFree(toAsyncRpcMiddleware(getFilterLogs)),
})
// setup filter updating and destroy handler
const filterUpdater = async ({ oldBlock, newBlock }) => {
if (filters.length === 0) return
// lock update reads
const releaseLock = await mutex.acquire()
try {
// process all filters in parallel
await Promise.all(objValues(filters).map(async (filter) => {
try {
await filter.update({ oldBlock, newBlock })
} catch (err) {
// handle each error individually so filter update errors don't affect other filters
console.error(err)
}
}))
} catch (err) {
// log error so we don't skip the releaseLock
console.error(err)
}
// unlock update reads
releaseLock()
}
// expose filter methods directly
middleware.newLogFilter = newLogFilter
middleware.newBlockFilter = newBlockFilter
middleware.newPendingTransactionFilter = newPendingTransactionFilter
middleware.uninstallFilter = uninstallFilterHandler
middleware.getFilterChanges = getFilterChanges
middleware.getFilterLogs = getFilterLogs
// expose destroy method for cleanup
middleware.destroy = () => {
uninstallAllFilters()
}
return middleware
//
// new filters
//
async function newLogFilter(params) {
const filter = new LogFilter({ provider, ethQuery, params })
const filterIndex = await installFilter(filter)
return filter
}
async function newBlockFilter() {
const filter = new BlockFilter({ provider, ethQuery })
const filterIndex = await installFilter(filter)
return filter
}
async function newPendingTransactionFilter() {
const filter = new TxFilter({ provider, ethQuery })
const filterIndex = await installFilter(filter)
return filter
}
//
// get filter changes
//
async function getFilterChanges(filterIndexHex) {
const filterIndex = hexToInt(filterIndexHex)
const filter = filters[filterIndex]
if (!filter) {
throw new Error('No filter for index "${filterIndex}"')
}
const results = filter.getChangesAndClear()
return results
}
async function getFilterLogs(filterIndexHex) {
const filterIndex = hexToInt(filterIndexHex)
const filter = filters[filterIndex]
if (!filter) {
throw new Error('No filter for index "${filterIndex}"')
}
const results = filter.getAllResults()
return results
}
//
// remove filters
//
async function uninstallFilterHandler(filterIndexHex) {
// check filter exists
const filterIndex = hexToInt(filterIndexHex)
const filter = filters[filterIndex]
const result = Boolean(filter)
// uninstall filter
if (result) {
await uninstallFilter(filterIndex)
}
return result
}
//
// utils
//
async function installFilter(filter) {
const prevFilterCount = objValues(filters).length
// install filter
const currentBlock = await blockTracker.getLatestBlock()
await filter.initialize({ currentBlock })
filterIndex++
filters[filterIndex] = filter
filter.id = filterIndex
filter.idHex = intToHex(filterIndex)
// update block tracker subs
const newFilterCount = objValues(filters).length
updateBlockTrackerSubs({ prevFilterCount, newFilterCount })
return filterIndex
}
async function uninstallFilter(filterIndex) {
const prevFilterCount = objValues(filters).length
delete filters[filterIndex]
// update block tracker subs
const newFilterCount = objValues(filters).length
updateBlockTrackerSubs({ prevFilterCount, newFilterCount })
}
async function uninstallAllFilters() {
const prevFilterCount = objValues(filters).length
filters = {}
// update block tracker subs
updateBlockTrackerSubs({ prevFilterCount, newFilterCount: 0 })
}
function updateBlockTrackerSubs({ prevFilterCount, newFilterCount }) {
// subscribe
if (prevFilterCount === 0 && newFilterCount > 0) {
blockTracker.on('sync', filterUpdater)
return
}
// unsubscribe
if (prevFilterCount > 0 && newFilterCount === 0) {
blockTracker.removeListener('sync', filterUpdater)
return
}
}
}
// helper for turning filter constructors into rpc middleware
function toFilterCreationMiddleware(createFilterFn) {
return toAsyncRpcMiddleware(async (...args) => {
const filter = await createFilterFn(...args)
const result = intToHex(filter.id)
return result
})
}
// helper for pulling out req.params and setting res.result
function toAsyncRpcMiddleware(asyncFn) {
return createAsyncMiddleware(async (req, res) => {
const result = await asyncFn.apply(null, req.params)
res.result = result
})
}
function mutexMiddlewareWrapper({ mutex }) {
return (middleware) => {
return async (req, res, next, end) => {
// wait for mutex available
// we can release immediately because
// we just need to make sure updates aren't active
const releaseLock = await mutex.acquire()
releaseLock()
middleware(req, res, next, end)
}
}
}
function objValues(obj, fn){
const values = []
for (let key in obj) {
values.push(obj[key])
}
return values
}