eth-json-rpc-filters/subscriptionManager.js

130 lines
3.8 KiB
JavaScript

const SafeEventEmitter = require('safe-event-emitter')
const createScaffoldMiddleware = require('eth-json-rpc-middleware/scaffold')
const createAsyncMiddleware = require('json-rpc-engine/src/createAsyncMiddleware')
const createFilterMiddleware = require('./index.js')
const { unsafeRandomBytes, incrementHexInt } = require('./hexUtils.js')
const getBlocksForRange = require('./getBlocksForRange.js')
module.exports = createSubscriptionMiddleware
function createSubscriptionMiddleware({ blockTracker, provider }) {
// state and utilities for handling subscriptions
const subscriptions = {}
const filterManager = createFilterMiddleware({ blockTracker, provider })
// create subscriptionManager api object
const events = new SafeEventEmitter()
const middleware = createScaffoldMiddleware({
eth_subscribe: createAsyncMiddleware(subscribe),
eth_unsubscribe: createAsyncMiddleware(unsubscribe),
})
return { events, middleware }
async function subscribe(req, res) {
const subscriptionType = req.params[0]
// subId is 16 byte hex string
const subId = unsafeRandomBytes(16)
// create sub
let sub
switch (subscriptionType) {
case 'newHeads':
sub = createSubNewHeads({ subId })
break
case 'logs':
const filterParams = req.params[1]
const filter = await filterManager.newLogFilter(filterParams)
sub = createSubFromFilter({ subId, filter })
break
default:
throw new Error(`SubscriptionManager - unsupported subscription type "${subscriptionType}"`)
}
subscriptions[subId] = sub
res.result = subId
return
function createSubNewHeads({ subId }) {
const sub = {
type: subscriptionType,
destroy: async () => {
blockTracker.removeListener('sync', sub.update)
},
update: async ({ oldBlock, newBlock }) => {
// for newHeads
const toBlock = newBlock
const fromBlock = incrementHexInt(oldBlock)
const rawBlocks = await getBlocksForRange({ provider, fromBlock, toBlock })
const results = rawBlocks.map(normalizeBlock)
results.forEach((value) => {
_emitSubscriptionResult(subId, value)
})
}
}
// check for subscription updates on new block
blockTracker.on('sync', sub.update)
return sub
}
function createSubFromFilter({ subId, filter }){
filter.on('update', result => _emitSubscriptionResult(subId, result))
const sub = {
type: subscriptionType,
destroy: async () => {
return await filterManager.uninstallFilter(filter.idHex)
},
}
return sub
}
}
async function unsubscribe(req, res) {
const id = req.params[0]
const subscription = subscriptions[id]
// if missing, return "false" to indicate it was not removed
if (!subscription) {
res.result = false
return
}
// cleanup subscription
delete subscriptions[id]
await subscription.destroy()
res.result = true
}
function _emitSubscriptionResult(filterIdHex, value) {
events.emit('notification', {
jsonrpc: '2.0',
method: 'eth_subscription',
params: {
subscription: filterIdHex,
result: value,
},
})
}
}
function normalizeBlock(block) {
return {
hash: block.hash,
parentHash: block.parentHash,
sha3Uncles: block.sha3Uncles,
miner: block.miner,
stateRoot: block.stateRoot,
transactionsRoot: block.transactionsRoot,
receiptsRoot: block.receiptsRoot,
logsBloom: block.logsBloom,
difficulty: block.difficulty,
number: block.number,
gasLimit: block.gasLimit,
gasUsed: block.gasUsed,
nonce: block.nonce,
mixHash: block.mixHash,
timestamp: block.timestamp,
extraData: block.extraData,
}
}