asyncified filters
This commit is contained in:
parent
61f1ba6dda
commit
ddafe0027a
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -93,8 +93,7 @@ web3.eth.filter = function (fil, eventParams, options, formatter) {
|
|||
return fil(eventParams, options);
|
||||
}
|
||||
|
||||
// what outputLogFormatter? that's wrong
|
||||
//return new Filter(fil, watches.eth(), formatters.outputLogFormatter);
|
||||
// output logs works for blockFilter and pendingTransaction filters?
|
||||
return new Filter(fil, watches.eth(), formatter || formatters.outputLogFormatter);
|
||||
};
|
||||
/*jshint maxparams:3 */
|
||||
|
|
|
@ -75,6 +75,7 @@ var getOptions = function (options) {
|
|||
};
|
||||
|
||||
var Filter = function (options, methods, formatter) {
|
||||
var self = this;
|
||||
var implementation = {};
|
||||
methods.forEach(function (method) {
|
||||
method.attachToObject(implementation);
|
||||
|
@ -83,51 +84,69 @@ var Filter = function (options, methods, formatter) {
|
|||
this.implementation = implementation;
|
||||
this.callbacks = [];
|
||||
this.formatter = formatter;
|
||||
this.filterId = this.implementation.newFilter(this.options);
|
||||
this.implementation.newFilter(this.options, function(error, id){
|
||||
if(error)
|
||||
self.filterError = error;
|
||||
else
|
||||
self.filterId = id;
|
||||
});
|
||||
};
|
||||
|
||||
Filter.prototype.watch = function (callback) {
|
||||
this.callbacks.push(callback);
|
||||
var self = this;
|
||||
|
||||
var onMessage = function (error, messages) {
|
||||
if (error) {
|
||||
return self.callbacks.forEach(function (callback) {
|
||||
callback(error);
|
||||
});
|
||||
}
|
||||
// check inf an interval of 10ms if the filter id has arrived
|
||||
var intervalId = setInterval(function(){
|
||||
|
||||
messages.forEach(function (message) {
|
||||
message = self.formatter ? self.formatter(message) : message;
|
||||
self.callbacks.forEach(function (callback) {
|
||||
callback(null, message);
|
||||
});
|
||||
});
|
||||
};
|
||||
if(self.filterId || self.filterError)
|
||||
clearInterval(intervalId);
|
||||
|
||||
// call getFilterLogs on start
|
||||
if (!utils.isString(this.options)) {
|
||||
this.get(function (err, messages) {
|
||||
// don't send all the responses to all the watches again... just to this one
|
||||
if (err) {
|
||||
callback(err);
|
||||
if(self.filterError || !self.filterId)
|
||||
return;
|
||||
|
||||
self.callbacks.push(callback);
|
||||
|
||||
var onMessage = function (error, messages) {
|
||||
if (error) {
|
||||
return self.callbacks.forEach(function (callback) {
|
||||
callback(error);
|
||||
});
|
||||
}
|
||||
|
||||
messages.forEach(function (message) {
|
||||
callback(null, message);
|
||||
message = self.formatter ? self.formatter(message) : message;
|
||||
self.callbacks.forEach(function (callback) {
|
||||
callback(null, message);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
RequestManager.getInstance().startPolling({
|
||||
method: this.implementation.poll.call,
|
||||
params: [this.filterId],
|
||||
}, this.filterId, onMessage, this.stopWatching.bind(this));
|
||||
// call getFilterLogs on start
|
||||
if (!utils.isString(self.options)) {
|
||||
self.get(function (err, messages) {
|
||||
// don't send all the responses to all the watches again... just to self one
|
||||
if (err) {
|
||||
callback(err);
|
||||
}
|
||||
|
||||
messages.forEach(function (message) {
|
||||
callback(null, message);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
RequestManager.getInstance().startPolling({
|
||||
method: self.implementation.poll.call,
|
||||
params: [self.filterId],
|
||||
}, self.filterId, onMessage, self.stopWatching.bind(self));
|
||||
|
||||
}, 10);
|
||||
};
|
||||
|
||||
Filter.prototype.stopWatching = function () {
|
||||
RequestManager.getInstance().stopPolling(this.filterId);
|
||||
this.implementation.uninstallFilter(this.filterId);
|
||||
// remove async
|
||||
this.implementation.uninstallFilter(this.filterId, function(){});
|
||||
this.callbacks = [];
|
||||
};
|
||||
|
||||
|
|
|
@ -29,11 +29,11 @@ var eth = function () {
|
|||
|
||||
switch(type) {
|
||||
case 'latest':
|
||||
args.pop();
|
||||
args.shift();
|
||||
this.params = 0;
|
||||
return 'eth_newBlockFilter';
|
||||
case 'pending':
|
||||
args.pop();
|
||||
args.shift();
|
||||
this.params = 0;
|
||||
return 'eth_newPendingTransactionFilter';
|
||||
default:
|
||||
|
|
Loading…
Reference in New Issue