lock emailservice instances

This commit is contained in:
Ivan Socolsky 2015-06-09 18:22:06 -03:00
parent 9486f57288
commit 23cca5a948
4 changed files with 74 additions and 48 deletions

View File

@ -204,6 +204,14 @@ EmailService.prototype.sendEmail = function(notification, cb) {
self._getRecipientsList(notification, emailType, function(err, recipientsList) {
if (_.isEmpty(recipientsList)) return cb();
// TODO: Optimize so one process does not have to wait until all others are done
// Instead set a flag somewhere in the db to indicate that this process is free
// to serve another request.
self.lock.runLocked('email-' + notification.id, cb, function(cb) {
self.storage.fetchEmailByNotification(notification.id, function(err, email) {
if (err) return cb(err);
if (email) return cb();
async.waterfall([
function(next) {
@ -231,6 +239,7 @@ EmailService.prototype.sendEmail = function(notification, cb) {
to: recipient.emailAddress,
subject: content.subject,
body: content.body,
notificationId: notification.id,
});
self.storage.storeEmail(email, function(err) {
return next(err, email);
@ -258,6 +267,8 @@ EmailService.prototype.sendEmail = function(notification, cb) {
return cb(err);
});
});
});
});
};
module.exports = EmailService;

View File

@ -24,6 +24,7 @@ Email.create = function(opts) {
x.status = 'pending';
x.attempts = 0;
x.lastAttemptOn = null;
x.notificationId = opts.notificationId;
return x;
};
@ -41,6 +42,7 @@ Email.fromObj = function(obj) {
x.status = obj.status;
x.attempts = obj.attempts;
x.lastAttemptOn = obj.lastAttemptOn;
x.notificationId = obj.notificationId;
return x;
};

View File

@ -52,6 +52,9 @@ Storage.prototype._createIndexes = function() {
this.db.collection(collections.ADDRESSES).createIndex({
address: 1,
});
this.db.collection(collections.EMAIL_QUEUE).createIndex({
notificationId: 1,
});
};
Storage.prototype.connect = function(opts, cb) {
@ -407,6 +410,16 @@ Storage.prototype.fetchUnsentEmails = function(cb) {
});
};
Storage.prototype.fetchEmailByNotification = function(notificationId, cb) {
this.db.collection(collections.EMAIL_QUEUE).findOne({
notificationId: notificationId,
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
return cb(null, Model.Email.fromObj(result));
});
};
Storage.prototype._dump = function(cb, fn) {
fn = fn || console.log;

View File

@ -497,10 +497,10 @@ describe('Wallet service', function() {
});
});
it.only('should support multiple emailservice instances running concurrently', function(done) {
it('should support multiple emailservice instances running concurrently', function(done) {
var emailService2 = new EmailService();
emailService2.start({
lockOpts: {},
lock: emailService.lock, // Use same locker service
messageBroker: server.messageBroker,
storage: storage,
mailer: mailerStub,