diff --git a/lib/emailservice.js b/lib/emailservice.js index e5ad0d8..aa6e994 100644 --- a/lib/emailservice.js +++ b/lib/emailservice.js @@ -204,58 +204,69 @@ EmailService.prototype.sendEmail = function(notification, cb) { self._getRecipientsList(notification, emailType, function(err, recipientsList) { if (_.isEmpty(recipientsList)) return cb(); - async.waterfall([ + // TODO: Optimize so one process does not have to wait until all others are done + // Instead set a flag somewhere in the db to indicate that this process is free + // to serve another request. + self.lock.runLocked('email-' + notification.id, cb, function(cb) { + self.storage.fetchEmailByNotification(notification.id, function(err, email) { + if (err) return cb(err); + if (email) return cb(); - function(next) { - async.parallel([ + async.waterfall([ function(next) { - self._readTemplate(emailType.filename, next); + async.parallel([ + + function(next) { + self._readTemplate(emailType.filename, next); + }, + function(next) { + self._getDataForTemplate(notification, next); + }, + ], function(err, res) { + next(err, res[0], res[1]); + }); }, - function(next) { - self._getDataForTemplate(notification, next); + function(template, data, next) { + self._applyTemplate(template, data, next); }, - ], function(err, res) { - next(err, res[0], res[1]); + function(content, next) { + async.map(recipientsList, function(recipient, next) { + var email = Model.Email.create({ + walletId: notification.walletId, + copayerId: recipient.copayerId, + from: self.from, + to: recipient.emailAddress, + subject: content.subject, + body: content.body, + notificationId: notification.id, + }); + self.storage.storeEmail(email, function(err) { + return next(err, email); + }); + }, next); + }, + function(emails, next) { + async.each(emails, function(email, next) { + self._send(email, function(err) { + if (err) { + email.setFail(); + } else { + email.setSent(); + } + self.storage.storeEmail(email, next); + }); + }, function(err) { + return next(); + }); + }, + ], function(err) { + if (err) { + log.error('An error ocurred generating email notification', err); + } + return cb(err); }); - }, - function(template, data, next) { - self._applyTemplate(template, data, next); - }, - function(content, next) { - async.map(recipientsList, function(recipient, next) { - var email = Model.Email.create({ - walletId: notification.walletId, - copayerId: recipient.copayerId, - from: self.from, - to: recipient.emailAddress, - subject: content.subject, - body: content.body, - }); - self.storage.storeEmail(email, function(err) { - return next(err, email); - }); - }, next); - }, - function(emails, next) { - async.each(emails, function(email, next) { - self._send(email, function(err) { - if (err) { - email.setFail(); - } else { - email.setSent(); - } - self.storage.storeEmail(email, next); - }); - }, function(err) { - return next(); - }); - }, - ], function(err) { - if (err) { - log.error('An error ocurred generating email notification', err); - } - return cb(err); + }); }); }); }; diff --git a/lib/model/email.js b/lib/model/email.js index 2d30f07..63140bb 100644 --- a/lib/model/email.js +++ b/lib/model/email.js @@ -24,6 +24,7 @@ Email.create = function(opts) { x.status = 'pending'; x.attempts = 0; x.lastAttemptOn = null; + x.notificationId = opts.notificationId; return x; }; @@ -41,6 +42,7 @@ Email.fromObj = function(obj) { x.status = obj.status; x.attempts = obj.attempts; x.lastAttemptOn = obj.lastAttemptOn; + x.notificationId = obj.notificationId; return x; }; diff --git a/lib/storage.js b/lib/storage.js index ead5b29..19eee01 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -52,6 +52,9 @@ Storage.prototype._createIndexes = function() { this.db.collection(collections.ADDRESSES).createIndex({ address: 1, }); + this.db.collection(collections.EMAIL_QUEUE).createIndex({ + notificationId: 1, + }); }; Storage.prototype.connect = function(opts, cb) { @@ -407,6 +410,16 @@ Storage.prototype.fetchUnsentEmails = function(cb) { }); }; +Storage.prototype.fetchEmailByNotification = function(notificationId, cb) { + this.db.collection(collections.EMAIL_QUEUE).findOne({ + notificationId: notificationId, + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + + return cb(null, Model.Email.fromObj(result)); + }); +}; Storage.prototype._dump = function(cb, fn) { fn = fn || console.log; diff --git a/test/integration/server.js b/test/integration/server.js index 36a3f7c..78b759e 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -497,10 +497,10 @@ describe('Wallet service', function() { }); }); - it.only('should support multiple emailservice instances running concurrently', function(done) { + it('should support multiple emailservice instances running concurrently', function(done) { var emailService2 = new EmailService(); emailService2.start({ - lockOpts: {}, + lock: emailService.lock, // Use same locker service messageBroker: server.messageBroker, storage: storage, mailer: mailerStub,